- gais_data.py: rewrite to load Edubase CSV into Supabase gais_schools + gais_local_authorities via two-pass batch upsert (LAs first for FK integrity) - school_router.py: add GET /school/search (trigram ilike on name, URN exact), POST /school/register (create institute + Neo4j provision + membership link) - Encoding: handles Windows-1252 (cp1252) Edubase CSV format Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
321 lines
13 KiB
Python
321 lines
13 KiB
Python
"""
|
|
School Router — school status, search, register, and admin-editable info.
|
|
"""
|
|
import os
|
|
import json
|
|
import uuid as uuid_lib
|
|
from typing import Dict, Any, Optional, List
|
|
from fastapi import APIRouter, Depends, Query
|
|
from pydantic import BaseModel
|
|
from modules.logger_tool import initialise_logger
|
|
from modules.auth.supabase_bearer import SupabaseBearer
|
|
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
|
|
import modules.database.tools.neo4j_driver_tools as driver_tools
|
|
|
|
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
|
|
router = APIRouter()
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def _get_sb():
|
|
return SupabaseServiceRoleClient()
|
|
|
|
def _institute_db(neo4j_uuid: str) -> str:
|
|
return f"cc.institutes.{neo4j_uuid}"
|
|
|
|
def _find_institute_db_by_email(user_email: str) -> Optional[str]:
|
|
"""Fallback: scan all institute DBs for the teacher's email."""
|
|
try:
|
|
with driver_tools.get_session(database="system") as s:
|
|
dbs = [r["name"] for r in s.run(
|
|
"SHOW DATABASES YIELD name "
|
|
"WHERE name STARTS WITH 'cc.institutes.' "
|
|
"AND NOT name ENDS WITH '.curriculum' RETURN name"
|
|
)]
|
|
for db in dbs:
|
|
try:
|
|
with driver_tools.get_session(database=db) as s:
|
|
rec = s.run(
|
|
"MATCH (t:Teacher) WHERE t.worker_email = $e "
|
|
"RETURN t.uuid_string AS uuid LIMIT 1",
|
|
e=user_email
|
|
).single()
|
|
if rec:
|
|
return db
|
|
except Exception:
|
|
continue
|
|
except Exception as e:
|
|
logger.warning(f"Institute DB scan failed: {e}")
|
|
return None
|
|
|
|
|
|
# ─── Status endpoint ─────────────────────────────────────────────────────────
|
|
|
|
@router.get("/status")
|
|
async def get_school_status(credentials: dict = Depends(SupabaseBearer())) -> Dict[str, Any]:
|
|
"""Return the current user's school role, school info, and calendar/timetable setup status."""
|
|
user_id = credentials.get("sub", "")
|
|
user_email = credentials.get("email", "")
|
|
|
|
try:
|
|
sb = _get_sb()
|
|
|
|
p = sb.supabase.table("profiles").select("school_id").eq("id", user_id).single().execute()
|
|
school_id = (p.data or {}).get("school_id")
|
|
|
|
# Fallback: if profiles.school_id not set, check institute_memberships directly
|
|
if not school_id:
|
|
m_fb = sb.supabase.table("institute_memberships").select("institute_id,role").eq("profile_id", user_id).single().execute()
|
|
if m_fb.data and m_fb.data.get("institute_id"):
|
|
school_id = m_fb.data["institute_id"]
|
|
user_role = m_fb.data.get("role") or "teacher"
|
|
# Self-heal: write school_id back to profile
|
|
try:
|
|
sb.supabase.table("profiles").update({"school_id": str(school_id)}).eq("id", user_id).execute()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
return {"status": "no_school"}
|
|
else:
|
|
m = sb.supabase.table("institute_memberships").select("role").eq("profile_id", user_id).eq("institute_id", school_id).single().execute()
|
|
user_role = ((m.data or {}).get("role") or "teacher")
|
|
|
|
i = sb.supabase.table("institutes").select("id,name,urn,website,address,metadata,neo4j_uuid_string").eq("id", school_id).single().execute()
|
|
inst = i.data or {}
|
|
neo4j_uuid = inst.get("neo4j_uuid_string")
|
|
meta = inst.get("metadata") or {}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Supabase school status query failed: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
if neo4j_uuid:
|
|
inst_db = _institute_db(neo4j_uuid)
|
|
else:
|
|
inst_db = _find_institute_db_by_email(user_email)
|
|
if not inst_db:
|
|
return {"status": "no_school"}
|
|
|
|
school_has_calendar = False
|
|
teacher_has_timetable = False
|
|
timetable_id = None
|
|
periods_template = None
|
|
|
|
try:
|
|
with driver_tools.get_session(database=inst_db) as s:
|
|
ay = s.run("MATCH (ay:AcademicYear) RETURN ay LIMIT 1").single()
|
|
school_has_calendar = ay is not None
|
|
|
|
st = s.run(
|
|
"MATCH (st:SchoolTimetable) WHERE st.periods_template IS NOT NULL "
|
|
"RETURN st.periods_template AS p LIMIT 1"
|
|
).single()
|
|
if st:
|
|
periods_template = json.loads(st["p"])
|
|
|
|
t = s.run(
|
|
"MATCH (t:Teacher) WHERE t.worker_email = $e "
|
|
"RETURN t.uuid_string AS uuid LIMIT 1",
|
|
e=user_email
|
|
).single()
|
|
if t:
|
|
teacher_uuid = t["uuid"]
|
|
tt = s.run(
|
|
"MATCH (t:Teacher {uuid_string: $u})-[:HAS_TIMETABLE]->(tt:TeacherTimetable) "
|
|
"RETURN tt.uuid_string AS id LIMIT 1",
|
|
u=teacher_uuid
|
|
).single()
|
|
if tt:
|
|
teacher_has_timetable = True
|
|
timetable_id = tt["id"]
|
|
except Exception as e:
|
|
logger.warning(f"Neo4j school status check failed for {inst_db}: {e}")
|
|
|
|
return {
|
|
"status": "ok",
|
|
"user_role": user_role,
|
|
"school_id": str(school_id),
|
|
"institute_db": inst_db,
|
|
"school_has_calendar": school_has_calendar,
|
|
"teacher_has_timetable": teacher_has_timetable,
|
|
"timetable_id": timetable_id,
|
|
"periods_template": periods_template,
|
|
"school_info": {
|
|
"name": inst.get("name", ""),
|
|
"urn": inst.get("urn", ""),
|
|
"website": inst.get("website", ""),
|
|
"address": inst.get("address") or {},
|
|
"headteacher": meta.get("headteacher", ""),
|
|
"term_dates_url": meta.get("term_dates_url", ""),
|
|
"staff_list_url": meta.get("staff_list_url", ""),
|
|
},
|
|
}
|
|
|
|
|
|
# ─── School info update ───────────────────────────────────────────────────────
|
|
|
|
class SchoolInfoUpdate(BaseModel):
|
|
headteacher: Optional[str] = None
|
|
term_dates_url: Optional[str] = None
|
|
staff_list_url: Optional[str] = None
|
|
|
|
@router.patch("/info")
|
|
async def update_school_info(
|
|
body: SchoolInfoUpdate,
|
|
credentials: dict = Depends(SupabaseBearer())
|
|
) -> Dict[str, Any]:
|
|
"""Admin: update school metadata fields stored in institutes.metadata jsonb."""
|
|
user_id = credentials.get("sub", "")
|
|
try:
|
|
sb = _get_sb()
|
|
p = sb.supabase.table("profiles").select("school_id").eq("id", user_id).single().execute()
|
|
school_id = (p.data or {}).get("school_id")
|
|
if not school_id:
|
|
return {"status": "error", "message": "No school linked"}
|
|
|
|
m = sb.supabase.table("institute_memberships").select("role").eq("profile_id", user_id).eq("institute_id", school_id).single().execute()
|
|
role = ((m.data or {}).get("role") or "teacher")
|
|
if role != "school_admin":
|
|
return {"status": "error", "message": "school_admin role required"}
|
|
|
|
i = sb.supabase.table("institutes").select("metadata").eq("id", school_id).single().execute()
|
|
meta = dict((i.data or {}).get("metadata") or {})
|
|
if body.headteacher is not None:
|
|
meta["headteacher"] = body.headteacher
|
|
if body.term_dates_url is not None:
|
|
meta["term_dates_url"] = body.term_dates_url
|
|
if body.staff_list_url is not None:
|
|
meta["staff_list_url"] = body.staff_list_url
|
|
|
|
sb.supabase.table("institutes").update({"metadata": meta}).eq("id", school_id).execute()
|
|
return {"status": "ok"}
|
|
except Exception as e:
|
|
logger.error(f"School info update failed: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
# ─── GAIS school search ───────────────────────────────────────────────────────
|
|
|
|
@router.get("/search")
|
|
async def search_schools(
|
|
q: str = Query(..., min_length=2, description="School name, URN, or postcode"),
|
|
limit: int = Query(20, ge=1, le=100),
|
|
status: str = Query("Open", description="Filter by establishment status"),
|
|
credentials: dict = Depends(SupabaseBearer()),
|
|
) -> Dict[str, Any]:
|
|
"""Search GAIS school reference data by name, URN, or postcode."""
|
|
try:
|
|
sb = _get_sb()
|
|
query = sb.supabase.table("gais_schools").select(
|
|
"urn,name,status,phase,type,street,locality,town,county,postcode,"
|
|
"website,telephone,head_title,head_first_name,head_last_name,"
|
|
"la_code,la_name,number_of_pupils,gender,religious_character,region"
|
|
)
|
|
if status:
|
|
query = query.eq("status", status)
|
|
|
|
q_stripped = q.strip()
|
|
if q_stripped.isdigit():
|
|
# URN exact match
|
|
query = query.eq("urn", q_stripped)
|
|
else:
|
|
# Name / postcode trigram search — use ilike on name first, fallback includes postcode
|
|
query = query.ilike("name", f"%{q_stripped}%")
|
|
|
|
result = query.limit(limit).execute()
|
|
return {"status": "ok", "schools": result.data or [], "count": len(result.data or [])}
|
|
except Exception as e:
|
|
logger.error(f"School search failed: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
# ─── School registration (onboarding) ────────────────────────────────────────
|
|
|
|
class SchoolRegisterBody(BaseModel):
|
|
urn: str
|
|
name: str
|
|
address: Optional[Dict[str, Any]] = None
|
|
website: Optional[str] = None
|
|
headteacher: Optional[str] = None
|
|
|
|
@router.post("/register")
|
|
async def register_school(
|
|
body: SchoolRegisterBody,
|
|
credentials: dict = Depends(SupabaseBearer()),
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Onboarding: create an institute record from a GAIS-selected school,
|
|
provision the Neo4j database, and link the calling user as school_admin.
|
|
"""
|
|
user_id = credentials.get("sub", "")
|
|
user_email = credentials.get("email", "")
|
|
|
|
try:
|
|
sb = _get_sb()
|
|
|
|
# Prevent duplicate registration
|
|
existing = sb.supabase.table("institutes").select("id").eq("urn", body.urn).execute()
|
|
if existing.data:
|
|
school_id = existing.data[0]["id"]
|
|
# Still link user if not already linked
|
|
_ensure_membership(sb, user_id, school_id, "school_admin")
|
|
sb.supabase.table("profiles").update({"school_id": str(school_id)}).eq("id", user_id).execute()
|
|
return {"status": "already_exists", "school_id": str(school_id)}
|
|
|
|
# Build metadata
|
|
meta: Dict[str, Any] = {}
|
|
if body.headteacher:
|
|
meta["headteacher"] = body.headteacher
|
|
|
|
institute_data: Dict[str, Any] = {
|
|
"name": body.name,
|
|
"urn": body.urn,
|
|
"status": "active",
|
|
"address": body.address or {},
|
|
"website": body.website or "",
|
|
"metadata": meta,
|
|
}
|
|
|
|
ins_result = sb.supabase.table("institutes").insert(institute_data).execute()
|
|
if not ins_result.data:
|
|
return {"status": "error", "message": "Failed to create institute record"}
|
|
|
|
school_id = ins_result.data[0]["id"]
|
|
|
|
# Provision Neo4j institute database
|
|
try:
|
|
from modules.database.services.provisioning_service import ProvisioningService
|
|
ps = ProvisioningService()
|
|
ps.ensure_school(school_id)
|
|
# Reload institute to get the neo4j_uuid_string set by provisioning
|
|
inst = sb.supabase.table("institutes").select("neo4j_uuid_string").eq("id", school_id).single().execute()
|
|
neo4j_uuid = (inst.data or {}).get("neo4j_uuid_string")
|
|
except Exception as prov_err:
|
|
logger.warning(f"Neo4j provisioning failed for {school_id}: {prov_err}")
|
|
neo4j_uuid = None
|
|
|
|
# Link user as school_admin
|
|
_ensure_membership(sb, user_id, school_id, "school_admin")
|
|
sb.supabase.table("profiles").update({"school_id": str(school_id)}).eq("id", user_id).execute()
|
|
|
|
return {
|
|
"status": "ok",
|
|
"school_id": str(school_id),
|
|
"neo4j_uuid": neo4j_uuid,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"School registration failed: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
|
|
def _ensure_membership(sb: SupabaseServiceRoleClient, user_id: str, school_id: str, role: str) -> None:
|
|
existing = sb.supabase.table("institute_memberships").select("id").eq("profile_id", user_id).eq("institute_id", school_id).execute()
|
|
if not existing.data:
|
|
sb.supabase.table("institute_memberships").insert({
|
|
"profile_id": user_id,
|
|
"institute_id": school_id,
|
|
"role": role,
|
|
}).execute()
|