api/routers/database/tools/school_router.py
kcar e42cd09dea feat(phase-b): GAIS Supabase loader + school search/register endpoints
- gais_data.py: rewrite to load Edubase CSV into Supabase gais_schools +
  gais_local_authorities via two-pass batch upsert (LAs first for FK integrity)
- school_router.py: add GET /school/search (trigram ilike on name, URN exact),
  POST /school/register (create institute + Neo4j provision + membership link)
- Encoding: handles Windows-1252 (cp1252) Edubase CSV format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 01:51:45 +01:00

321 lines
13 KiB
Python

"""
School Router — school status, search, register, and admin-editable info.
"""
import os
import json
import uuid as uuid_lib
from typing import Dict, Any, Optional, List
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from modules.logger_tool import initialise_logger
from modules.auth.supabase_bearer import SupabaseBearer
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
import modules.database.tools.neo4j_driver_tools as driver_tools
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
router = APIRouter()
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _get_sb():
return SupabaseServiceRoleClient()
def _institute_db(neo4j_uuid: str) -> str:
return f"cc.institutes.{neo4j_uuid}"
def _find_institute_db_by_email(user_email: str) -> Optional[str]:
"""Fallback: scan all institute DBs for the teacher's email."""
try:
with driver_tools.get_session(database="system") as s:
dbs = [r["name"] for r in s.run(
"SHOW DATABASES YIELD name "
"WHERE name STARTS WITH 'cc.institutes.' "
"AND NOT name ENDS WITH '.curriculum' RETURN name"
)]
for db in dbs:
try:
with driver_tools.get_session(database=db) as s:
rec = s.run(
"MATCH (t:Teacher) WHERE t.worker_email = $e "
"RETURN t.uuid_string AS uuid LIMIT 1",
e=user_email
).single()
if rec:
return db
except Exception:
continue
except Exception as e:
logger.warning(f"Institute DB scan failed: {e}")
return None
# ─── Status endpoint ─────────────────────────────────────────────────────────
@router.get("/status")
async def get_school_status(credentials: dict = Depends(SupabaseBearer())) -> Dict[str, Any]:
"""Return the current user's school role, school info, and calendar/timetable setup status."""
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
try:
sb = _get_sb()
p = sb.supabase.table("profiles").select("school_id").eq("id", user_id).single().execute()
school_id = (p.data or {}).get("school_id")
# Fallback: if profiles.school_id not set, check institute_memberships directly
if not school_id:
m_fb = sb.supabase.table("institute_memberships").select("institute_id,role").eq("profile_id", user_id).single().execute()
if m_fb.data and m_fb.data.get("institute_id"):
school_id = m_fb.data["institute_id"]
user_role = m_fb.data.get("role") or "teacher"
# Self-heal: write school_id back to profile
try:
sb.supabase.table("profiles").update({"school_id": str(school_id)}).eq("id", user_id).execute()
except Exception:
pass
else:
return {"status": "no_school"}
else:
m = sb.supabase.table("institute_memberships").select("role").eq("profile_id", user_id).eq("institute_id", school_id).single().execute()
user_role = ((m.data or {}).get("role") or "teacher")
i = sb.supabase.table("institutes").select("id,name,urn,website,address,metadata,neo4j_uuid_string").eq("id", school_id).single().execute()
inst = i.data or {}
neo4j_uuid = inst.get("neo4j_uuid_string")
meta = inst.get("metadata") or {}
except Exception as e:
logger.warning(f"Supabase school status query failed: {e}")
return {"status": "error", "message": str(e)}
if neo4j_uuid:
inst_db = _institute_db(neo4j_uuid)
else:
inst_db = _find_institute_db_by_email(user_email)
if not inst_db:
return {"status": "no_school"}
school_has_calendar = False
teacher_has_timetable = False
timetable_id = None
periods_template = None
try:
with driver_tools.get_session(database=inst_db) as s:
ay = s.run("MATCH (ay:AcademicYear) RETURN ay LIMIT 1").single()
school_has_calendar = ay is not None
st = s.run(
"MATCH (st:SchoolTimetable) WHERE st.periods_template IS NOT NULL "
"RETURN st.periods_template AS p LIMIT 1"
).single()
if st:
periods_template = json.loads(st["p"])
t = s.run(
"MATCH (t:Teacher) WHERE t.worker_email = $e "
"RETURN t.uuid_string AS uuid LIMIT 1",
e=user_email
).single()
if t:
teacher_uuid = t["uuid"]
tt = s.run(
"MATCH (t:Teacher {uuid_string: $u})-[:HAS_TIMETABLE]->(tt:TeacherTimetable) "
"RETURN tt.uuid_string AS id LIMIT 1",
u=teacher_uuid
).single()
if tt:
teacher_has_timetable = True
timetable_id = tt["id"]
except Exception as e:
logger.warning(f"Neo4j school status check failed for {inst_db}: {e}")
return {
"status": "ok",
"user_role": user_role,
"school_id": str(school_id),
"institute_db": inst_db,
"school_has_calendar": school_has_calendar,
"teacher_has_timetable": teacher_has_timetable,
"timetable_id": timetable_id,
"periods_template": periods_template,
"school_info": {
"name": inst.get("name", ""),
"urn": inst.get("urn", ""),
"website": inst.get("website", ""),
"address": inst.get("address") or {},
"headteacher": meta.get("headteacher", ""),
"term_dates_url": meta.get("term_dates_url", ""),
"staff_list_url": meta.get("staff_list_url", ""),
},
}
# ─── School info update ───────────────────────────────────────────────────────
class SchoolInfoUpdate(BaseModel):
headteacher: Optional[str] = None
term_dates_url: Optional[str] = None
staff_list_url: Optional[str] = None
@router.patch("/info")
async def update_school_info(
body: SchoolInfoUpdate,
credentials: dict = Depends(SupabaseBearer())
) -> Dict[str, Any]:
"""Admin: update school metadata fields stored in institutes.metadata jsonb."""
user_id = credentials.get("sub", "")
try:
sb = _get_sb()
p = sb.supabase.table("profiles").select("school_id").eq("id", user_id).single().execute()
school_id = (p.data or {}).get("school_id")
if not school_id:
return {"status": "error", "message": "No school linked"}
m = sb.supabase.table("institute_memberships").select("role").eq("profile_id", user_id).eq("institute_id", school_id).single().execute()
role = ((m.data or {}).get("role") or "teacher")
if role != "school_admin":
return {"status": "error", "message": "school_admin role required"}
i = sb.supabase.table("institutes").select("metadata").eq("id", school_id).single().execute()
meta = dict((i.data or {}).get("metadata") or {})
if body.headteacher is not None:
meta["headteacher"] = body.headteacher
if body.term_dates_url is not None:
meta["term_dates_url"] = body.term_dates_url
if body.staff_list_url is not None:
meta["staff_list_url"] = body.staff_list_url
sb.supabase.table("institutes").update({"metadata": meta}).eq("id", school_id).execute()
return {"status": "ok"}
except Exception as e:
logger.error(f"School info update failed: {e}")
return {"status": "error", "message": str(e)}
# ─── GAIS school search ───────────────────────────────────────────────────────
@router.get("/search")
async def search_schools(
q: str = Query(..., min_length=2, description="School name, URN, or postcode"),
limit: int = Query(20, ge=1, le=100),
status: str = Query("Open", description="Filter by establishment status"),
credentials: dict = Depends(SupabaseBearer()),
) -> Dict[str, Any]:
"""Search GAIS school reference data by name, URN, or postcode."""
try:
sb = _get_sb()
query = sb.supabase.table("gais_schools").select(
"urn,name,status,phase,type,street,locality,town,county,postcode,"
"website,telephone,head_title,head_first_name,head_last_name,"
"la_code,la_name,number_of_pupils,gender,religious_character,region"
)
if status:
query = query.eq("status", status)
q_stripped = q.strip()
if q_stripped.isdigit():
# URN exact match
query = query.eq("urn", q_stripped)
else:
# Name / postcode trigram search — use ilike on name first, fallback includes postcode
query = query.ilike("name", f"%{q_stripped}%")
result = query.limit(limit).execute()
return {"status": "ok", "schools": result.data or [], "count": len(result.data or [])}
except Exception as e:
logger.error(f"School search failed: {e}")
return {"status": "error", "message": str(e)}
# ─── School registration (onboarding) ────────────────────────────────────────
class SchoolRegisterBody(BaseModel):
urn: str
name: str
address: Optional[Dict[str, Any]] = None
website: Optional[str] = None
headteacher: Optional[str] = None
@router.post("/register")
async def register_school(
body: SchoolRegisterBody,
credentials: dict = Depends(SupabaseBearer()),
) -> Dict[str, Any]:
"""
Onboarding: create an institute record from a GAIS-selected school,
provision the Neo4j database, and link the calling user as school_admin.
"""
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
try:
sb = _get_sb()
# Prevent duplicate registration
existing = sb.supabase.table("institutes").select("id").eq("urn", body.urn).execute()
if existing.data:
school_id = existing.data[0]["id"]
# Still link user if not already linked
_ensure_membership(sb, user_id, school_id, "school_admin")
sb.supabase.table("profiles").update({"school_id": str(school_id)}).eq("id", user_id).execute()
return {"status": "already_exists", "school_id": str(school_id)}
# Build metadata
meta: Dict[str, Any] = {}
if body.headteacher:
meta["headteacher"] = body.headteacher
institute_data: Dict[str, Any] = {
"name": body.name,
"urn": body.urn,
"status": "active",
"address": body.address or {},
"website": body.website or "",
"metadata": meta,
}
ins_result = sb.supabase.table("institutes").insert(institute_data).execute()
if not ins_result.data:
return {"status": "error", "message": "Failed to create institute record"}
school_id = ins_result.data[0]["id"]
# Provision Neo4j institute database
try:
from modules.database.services.provisioning_service import ProvisioningService
ps = ProvisioningService()
ps.ensure_school(school_id)
# Reload institute to get the neo4j_uuid_string set by provisioning
inst = sb.supabase.table("institutes").select("neo4j_uuid_string").eq("id", school_id).single().execute()
neo4j_uuid = (inst.data or {}).get("neo4j_uuid_string")
except Exception as prov_err:
logger.warning(f"Neo4j provisioning failed for {school_id}: {prov_err}")
neo4j_uuid = None
# Link user as school_admin
_ensure_membership(sb, user_id, school_id, "school_admin")
sb.supabase.table("profiles").update({"school_id": str(school_id)}).eq("id", user_id).execute()
return {
"status": "ok",
"school_id": str(school_id),
"neo4j_uuid": neo4j_uuid,
}
except Exception as e:
logger.error(f"School registration failed: {e}")
return {"status": "error", "message": str(e)}
def _ensure_membership(sb: SupabaseServiceRoleClient, user_id: str, school_id: str, role: str) -> None:
existing = sb.supabase.table("institute_memberships").select("id").eq("profile_id", user_id).eq("institute_id", school_id).execute()
if not existing.data:
sb.supabase.table("institute_memberships").insert({
"profile_id": user_id,
"institute_id": school_id,
"role": role,
}).execute()