api/routers/database/tools/timetable_builder_router.py
kcar abf8d05ca1 feat(phase-b): Supabase-first timetable, classes, enrollment, and student views
- timetable_builder_router: Supabase-primary slot write (POST /timetable/slots),
  week_cycle support, GET /slots reads from Supabase, materialize-periods endpoint,
  rebuild-neo4j endpoint, sync-lessons endpoint (Track B: TaughtLesson Neo4j nodes),
  _sync_teacher_timetables_to_neo4j and _sync_taught_lessons_to_neo4j helpers
- classes_router: GET /{class_id} enriched with profiles + enrollment_requests,
  GET /school/students for admin search, PATCH /enrollment-requests/{id} approve/reject
- taught_lessons_router: GET /student/lessons student week view with enrichment
- school_router: academic_periods sync, day-type management
- platform_admin_router + platform_admin: POST /admin/reset and /admin/seed endpoints
- invitations_router: teacher invite scaffolding
- reset_environment + seed_environment: idempotent dev environment scripts
- graph_tree_router: Supabase-first institute resolution
- provisioning_service: neo4j_private_db_name column support
- main.py + run/routers.py: register new routers

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-27 02:55:44 +01:00

1307 lines
51 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Timetable Builder Router — Supabase-first write architecture.
Supabase is the source of truth; Neo4j is a derived graph rebuildable at any time.
"""
import os
import json
from collections import defaultdict
from datetime import datetime, timedelta, date
from typing import Dict, Any, List, Optional, Tuple
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from modules.logger_tool import initialise_logger
from modules.auth.supabase_bearer import SupabaseBearer
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
import modules.database.tools.neo4j_driver_tools as driver_tools
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
router = APIRouter()
# ─── Constants ────────────────────────────────────────────────────────────────
DAY_NAMES = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
DAY_TYPE_LABELS: Dict[str, str] = {
"Academic": "AcademicDay",
"Holiday": "HolidayDay",
"Staff": "StaffDay",
"OffTimetable": "OffTimetableDay",
}
PERIOD_TYPE_LABELS: Dict[str, str] = {
"lesson": "AcademicPeriod",
"break": "BreakPeriod",
"registration": "RegistrationPeriod",
"offtimetable": "OffTimetablePeriod",
}
PERIOD_DAY_RELS: Dict[str, str] = {
"AcademicPeriod": "ACADEMIC_DAY_HAS_ACADEMIC_PERIOD",
"BreakPeriod": "ACADEMIC_DAY_HAS_BREAK_PERIOD",
"RegistrationPeriod": "ACADEMIC_DAY_HAS_REGISTRATION_PERIOD",
"OffTimetablePeriod": "ACADEMIC_DAY_HAS_ACADEMIC_PERIOD",
}
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _sb() -> SupabaseServiceRoleClient:
return SupabaseServiceRoleClient()
def _iso_date(s: str) -> date:
return datetime.strptime(s, "%Y-%m-%d").date()
def _academic_weeks(term_start: date, term_end: date):
"""Yield (week_number, monday_date) for each MonSun block that overlaps the term."""
current = term_start - timedelta(days=term_start.weekday())
if current < term_start:
current += timedelta(weeks=1)
n = 1
while current <= term_end:
yield n, current
current += timedelta(weeks=1)
n += 1
def _query_teacher_uuid(db: str, email: str) -> Optional[str]:
try:
with driver_tools.get_session(database=db) as s:
rec = s.run(
"MATCH (t:Teacher) WHERE t.worker_email = $e RETURN t.uuid_string AS uuid LIMIT 1",
e=email,
).single()
return rec["uuid"] if rec else None
except Exception:
return None
def _scan_institute_dbs(user_email: str) -> Tuple[Optional[str], Optional[str]]:
"""Fallback: scan all institute DBs for a Teacher node with matching email."""
if not user_email:
return None, None
try:
with driver_tools.get_session(database="system") as s:
dbs = [
r["name"] for r in s.run(
"SHOW DATABASES YIELD name "
"WHERE name STARTS WITH 'cc.institutes.' "
"AND NOT name ENDS WITH '.curriculum' RETURN name"
)
]
for db in dbs:
try:
with driver_tools.get_session(database=db) as s:
rec = s.run(
"MATCH (t:Teacher) WHERE t.worker_email = $e "
"RETURN t.uuid_string AS uuid LIMIT 1",
e=user_email,
).single()
if rec and rec["uuid"]:
return db, rec["uuid"]
except Exception:
continue
except Exception as e:
logger.warning(f"Institute DB scan failed: {e}")
return None, None
def _resolve_institute(
user_id: str, user_email: str
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""
Returns (supabase_institute_id, neo4j_institute_db, neo4j_teacher_uuid).
Supabase-first lookup with Neo4j email-scan fallback.
"""
# Fast path: profiles.school_id → institutes.neo4j_uuid_string
try:
sb = _sb()
p = sb.supabase.table("profiles").select("school_id").eq("id", user_id).single().execute()
school_id = (p.data or {}).get("school_id")
if school_id:
i = sb.supabase.table("institutes").select("id,neo4j_uuid_string").eq("id", str(school_id)).single().execute()
inst = i.data or {}
neo4j_uuid = inst.get("neo4j_uuid_string")
if neo4j_uuid:
db = f"cc.institutes.{neo4j_uuid}"
teacher_uuid = _query_teacher_uuid(db, user_email)
return str(school_id), db, teacher_uuid
except Exception as e:
logger.warning(f"Supabase-first institute resolve failed: {e}")
# Fallback: scan Neo4j
db, teacher_uuid = _scan_institute_dbs(user_email)
supabase_id: Optional[str] = None
if db:
try:
sb = _sb()
p = sb.supabase.table("profiles").select("school_id").eq("id", user_id).single().execute()
sid = (p.data or {}).get("school_id")
if sid:
supabase_id = str(sid)
except Exception:
pass
return supabase_id, db, teacher_uuid
# ─── Request models ───────────────────────────────────────────────────────────
class TermInput(BaseModel):
name: str
term_number: int
start_date: str
end_date: str
notes: Optional[str] = None
class PeriodInput(BaseModel):
code: str
name: str
start_time: str
end_time: str
period_type: str # lesson | break | registration | offtimetable
class DayInput(BaseModel):
date: str # "2025-09-01"
day_type: str = "Academic" # Academic | Holiday | Staff | OffTimetable
week_cycle: str = "A"
excluded_period_codes: Optional[List[str]] = None
class TermBreakInput(BaseModel):
name: str # "Christmas Break"
start_date: str # "2025-12-20"
end_date: str # "2026-01-05"
class WeekCycleOverride(BaseModel):
term_number: int
week_number: int # 1-based, within the term
cycle: str # "A" | "B"
class TimetableSetupRequest(BaseModel):
year_start: str
year_end: str
terms: List[TermInput]
periods: List[PeriodInput]
days: Optional[List[DayInput]] = None # None = auto-generate MonFri as Academic
term_breaks: Optional[List[TermBreakInput]] = None
week_cycles: Optional[List[WeekCycleOverride]] = None
class SlotInput(BaseModel):
day_of_week: str
period_code: str
subject_class: str
start_time: str
end_time: str
week_cycle: str = "" # '' = both A/B weeks; 'A' or 'B' = specific week
class SlotsRequest(BaseModel):
timetable_id: str
slots: List[SlotInput]
# ─── Supabase write helpers ───────────────────────────────────────────────────
def _sb_upsert_timetable(
sb: SupabaseServiceRoleClient,
institute_id: str,
year_start: date,
year_end: date,
year_label: str,
school_tt_id: str,
periods: List[PeriodInput],
) -> str:
res = sb.supabase.table("school_timetables").upsert(
{
"institute_id": institute_id,
"year_label": year_label,
"start_date": str(year_start),
"end_date": str(year_end),
"periods_template": [p.dict() for p in periods],
"neo4j_node_id": school_tt_id,
},
on_conflict="institute_id,year_label",
).execute()
return (res.data or [{}])[0].get("id", "")
def _sb_upsert_academic_year(
sb: SupabaseServiceRoleClient,
stt_sb_id: str,
institute_id: str,
year_label: str,
ay_id: str,
) -> str:
res = sb.supabase.table("academic_years").upsert(
{
"school_timetable_id": stt_sb_id,
"institute_id": institute_id,
"year_label": year_label,
"neo4j_node_id": ay_id,
},
on_conflict="school_timetable_id,year_label",
).execute()
return (res.data or [{}])[0].get("id", "")
def _sb_upsert_term_breaks(
sb: SupabaseServiceRoleClient,
institute_id: str,
stt_sb_id: str,
term_breaks: List[TermBreakInput],
) -> None:
for tb in term_breaks:
sb.supabase.table("academic_term_breaks").upsert(
{
"school_timetable_id": stt_sb_id,
"institute_id": institute_id,
"break_name": tb.name,
"start_date": tb.start_date,
"end_date": tb.end_date,
},
on_conflict="school_timetable_id,break_name",
).execute()
def _sb_upsert_terms_weeks_days(
sb: SupabaseServiceRoleClient,
ay_sb_id: str,
institute_id: str,
terms: List[TermInput],
year_start: date,
days_by_date: Dict[str, DayInput],
week_cycles_map: Optional[Dict[Tuple[int, int], str]] = None,
) -> None:
academic_day_number = 0
for term in terms:
t_id = f"term_{year_start.year}_{term.term_number}"
t_start = _iso_date(term.start_date)
t_end = _iso_date(term.end_date)
t_upsert: Dict[str, Any] = {
"academic_year_id": ay_sb_id,
"institute_id": institute_id,
"term_name": term.name,
"term_number": term.term_number,
"start_date": term.start_date,
"end_date": term.end_date,
"neo4j_node_id": t_id,
}
if term.notes is not None:
t_upsert["notes"] = term.notes
t_res = sb.supabase.table("academic_terms").upsert(
t_upsert, on_conflict="academic_year_id,term_number",
).execute()
t_sb_id = (t_res.data or [{}])[0].get("id", "")
for wn, wstart in _academic_weeks(t_start, t_end):
w_id = f"week_{t_id}_{wn}"
# Use caller-provided cycle if given, else default alternating A/B within term
if week_cycles_map and (term.term_number, wn) in week_cycles_map:
week_cycle = week_cycles_map[(term.term_number, wn)]
else:
week_cycle = "A" if wn % 2 == 1 else "B"
w_res = sb.supabase.table("academic_weeks").upsert(
{
"academic_term_id": t_sb_id,
"institute_id": institute_id,
"week_number": wn,
"start_date": wstart.isoformat(),
"week_cycle": week_cycle,
"neo4j_node_id": w_id,
},
on_conflict="academic_term_id,week_number",
).execute()
w_sb_id = (w_res.data or [{}])[0].get("id", "")
for day_offset in range(5): # MonFri
d = wstart + timedelta(days=day_offset)
if d < t_start or d > t_end:
continue
date_iso = d.isoformat()
day_input = days_by_date.get(date_iso)
day_type = day_input.day_type if day_input else "Academic"
excl = (day_input.excluded_period_codes or []) if day_input else []
if day_type == "Academic":
academic_day_number += 1
adn: Optional[int] = academic_day_number
else:
adn = None
d_id = f"day_{w_id}_{date_iso}"
sb.supabase.table("academic_days").upsert(
{
"academic_week_id": w_sb_id,
"academic_term_id": t_sb_id,
"institute_id": institute_id,
"date": date_iso,
"day_of_week": DAY_NAMES[d.weekday()],
"day_type": day_type,
"academic_day_number": adn,
"excluded_period_codes": excl,
"neo4j_node_id": d_id,
},
on_conflict="institute_id,date",
).execute()
def _sb_upsert_teacher_timetable(
sb: SupabaseServiceRoleClient,
profile_id: str,
institute_id: str,
stt_sb_id: str,
year_start: date,
year_end: date,
teacher_tt_id: str,
) -> str:
res = sb.supabase.table("teacher_timetables").upsert(
{
"profile_id": profile_id,
"institute_id": institute_id,
"school_timetable_id": stt_sb_id,
"start_date": str(year_start),
"end_date": str(year_end),
"neo4j_node_id": teacher_tt_id,
},
on_conflict="profile_id,school_timetable_id",
).execute()
return (res.data or [{}])[0].get("id", "")
# ─── Neo4j build from Supabase ────────────────────────────────────────────────
def _build_neo4j_from_supabase(institute_id: str, institute_db: str) -> Dict[str, int]:
"""
Read all academic calendar data from Supabase for institute_id and
create/merge the full Neo4j node graph in institute_db.
Returns counts of nodes created.
"""
sb = _sb()
stt_rows = sb.supabase.table("school_timetables").select("*").eq("institute_id", institute_id).execute().data or []
if not stt_rows:
raise ValueError(f"No school_timetable in Supabase for institute {institute_id}")
stt = stt_rows[0]
ay_rows = sb.supabase.table("academic_years").select("*").eq("school_timetable_id", stt["id"]).execute().data or []
if not ay_rows:
raise ValueError("No academic_year found for this timetable")
ay = ay_rows[0]
terms = sb.supabase.table("academic_terms").select("*").eq("academic_year_id", ay["id"]).order("term_number").execute().data or []
term_ids = [t["id"] for t in terms]
weeks: List[Dict] = []
if term_ids:
weeks = sb.supabase.table("academic_weeks").select("*").in_("academic_term_id", term_ids).order("start_date").execute().data or []
days = sb.supabase.table("academic_days").select("*").eq("institute_id", institute_id).order("date").execute().data or []
weeks_by_term: Dict[str, List[Dict]] = defaultdict(list)
for w in weeks:
weeks_by_term[w["academic_term_id"]].append(w)
days_by_week: Dict[str, List[Dict]] = defaultdict(list)
for d in days:
days_by_week[d["academic_week_id"]].append(d)
periods_template: List[Dict] = stt.get("periods_template") or []
counts = {"terms": 0, "weeks": 0, "days": 0, "periods": 0}
with driver_tools.get_session(database=institute_db) as s:
# SchoolTimetable
s.run("""
MERGE (tt:SchoolTimetable {uuid_string: $id})
SET tt.school_timetable_id = $id,
tt.start_date = date($start), tt.end_date = date($end),
tt.node_storage_path = $path,
tt.periods_template = $periods
WITH tt MATCH (sch:School)
MERGE (sch)-[:HAS_TIMETABLE]->(tt)
""",
id=stt["neo4j_node_id"],
start=str(stt["start_date"]),
end=str(stt["end_date"]),
path=f"timetable/{stt['neo4j_node_id']}",
periods=json.dumps(periods_template),
)
# AcademicYear
s.run("""
MERGE (ay:AcademicYear {uuid_string: $id})
SET ay.year = $year, ay.node_storage_path = $path
WITH ay MATCH (tt:SchoolTimetable {uuid_string: $tt_id})
MERGE (tt)-[:ACADEMIC_TIMETABLE_HAS_ACADEMIC_YEAR]->(ay)
""",
id=ay["neo4j_node_id"],
year=ay["year_label"],
path=f"timetable/{ay['neo4j_node_id']}",
tt_id=stt["neo4j_node_id"],
)
for term in terms:
s.run("""
MERGE (t:AcademicTerm {uuid_string: $id})
SET t.term_name = $name, t.term_number = $num,
t.start_date = date($start), t.end_date = date($end),
t.node_storage_path = $path
WITH t MATCH (ay:AcademicYear {uuid_string: $ay_id})
MERGE (ay)-[:ACADEMIC_YEAR_HAS_ACADEMIC_TERM]->(t)
""",
id=term["neo4j_node_id"],
name=term["term_name"],
num=str(term["term_number"]),
start=str(term["start_date"]),
end=str(term["end_date"]),
path=f"timetable/{term['neo4j_node_id']}",
ay_id=ay["neo4j_node_id"],
)
counts["terms"] += 1
for week in weeks_by_term[term["id"]]:
s.run("""
MERGE (w:AcademicWeek {uuid_string: $id})
SET w.academic_week_number = $num,
w.start_date = date($start),
w.week_cycle = $cycle,
w.week_type = 'academic',
w.node_storage_path = $path
WITH w MATCH (t:AcademicTerm {uuid_string: $t_id})
MERGE (t)-[:ACADEMIC_TERM_HAS_ACADEMIC_WEEK]->(w)
""",
id=week["neo4j_node_id"],
num=str(week["week_number"]),
start=str(week["start_date"]),
cycle=week["week_cycle"],
path=f"timetable/{week['neo4j_node_id']}",
t_id=term["neo4j_node_id"],
)
counts["weeks"] += 1
for day in days_by_week[week["id"]]:
label = DAY_TYPE_LABELS.get(day["day_type"], "AcademicDay")
s.run(
f"""
MERGE (d:{label} {{uuid_string: $id}})
SET d.date = date($date), d.day_of_week = $dow,
d.day_type = $dtype,
d.academic_day = $adn,
d.node_storage_path = $path
WITH d MATCH (w:AcademicWeek {{uuid_string: $w_id}})
MERGE (w)-[:ACADEMIC_WEEK_HAS_ACADEMIC_DAY]->(d)
WITH d MATCH (t:AcademicTerm {{uuid_string: $t_id}})
MERGE (t)-[:ACADEMIC_TERM_HAS_ACADEMIC_DAY]->(d)
""",
id=day["neo4j_node_id"],
date=str(day["date"]),
dow=day["day_of_week"],
dtype=day["day_type"],
adn=str(day.get("academic_day_number") or ""),
path=f"timetable/{day['neo4j_node_id']}",
w_id=week["neo4j_node_id"],
t_id=term["neo4j_node_id"],
)
counts["days"] += 1
# Period nodes for Academic days only
if day["day_type"] == "Academic":
excluded = set(day.get("excluded_period_codes") or [])
for p in periods_template:
if p.get("code") in excluded:
continue
p_label = PERIOD_TYPE_LABELS.get(
(p.get("period_type") or "lesson").lower(), "AcademicPeriod"
)
rel = PERIOD_DAY_RELS.get(p_label, "ACADEMIC_DAY_HAS_ACADEMIC_PERIOD")
p_id = f"{day['neo4j_node_id']}_{p['code']}"
s.run(
f"""
MERGE (p:{p_label} {{uuid_string: $id}})
SET p.period_code = $code, p.name = $name,
p.start_time = $start, p.end_time = $end,
p.node_storage_path = $path
WITH p MATCH (d:{label} {{uuid_string: $d_id}})
MERGE (d)-[:{rel}]->(p)
""",
id=p_id,
code=p["code"],
name=p["name"],
start=p["start_time"],
end=p["end_time"],
path=f"timetable/{p_id}",
d_id=day["neo4j_node_id"],
)
counts["periods"] += 1
# ── Teacher timetables + taught lessons (best-effort — may be empty on first run)
try:
tt_counts = _sync_teacher_timetables_to_neo4j(institute_id, institute_db, sb)
counts.update(tt_counts)
except Exception as e:
logger.warning(f"TeacherTimetable sync skipped: {e}")
try:
counts["taught_lessons"] = _sync_taught_lessons_to_neo4j(institute_id, institute_db, sb)
except Exception as e:
logger.warning(f"TaughtLesson sync skipped: {e}")
return counts
def _sync_teacher_timetables_to_neo4j(
institute_id: str,
institute_db: str,
sb: SupabaseServiceRoleClient,
) -> Dict[str, int]:
"""
Rebuild TeacherTimetable and TimetableSlot Neo4j nodes from Supabase
for the given institute. Safe to re-run (MERGE).
"""
counts: Dict[str, int] = {"teacher_timetables": 0, "slots": 0}
tt_rows = (
sb.supabase.table("teacher_timetables")
.select("id,profile_id,neo4j_node_id,start_date,end_date")
.eq("institute_id", institute_id)
.execute()
.data or []
)
if not tt_rows:
return counts
profile_ids = list({t["profile_id"] for t in tt_rows if t.get("profile_id")})
email_map: Dict[str, str] = {}
if profile_ids:
prows = (
sb.supabase.table("profiles")
.select("id,email")
.in_("id", profile_ids)
.execute()
.data or []
)
email_map = {p["id"]: p["email"] for p in prows}
tt_ids = [t["id"] for t in tt_rows]
all_slots: Dict[str, List[Dict]] = defaultdict(list)
for i in range(0, len(tt_ids), 100):
chunk = tt_ids[i : i + 100]
slot_rows = (
sb.supabase.table("teacher_timetable_slots")
.select("*")
.in_("teacher_timetable_id", chunk)
.execute()
.data or []
)
for slot in slot_rows:
all_slots[slot["teacher_timetable_id"]].append(slot)
with driver_tools.get_session(database=institute_db) as s:
for tt in tt_rows:
tt_neo4j_id = tt.get("neo4j_node_id")
if not tt_neo4j_id:
continue
teacher_email = email_map.get(tt["profile_id"], "")
teacher_uuid = _query_teacher_uuid(institute_db, teacher_email) if teacher_email else None
s.run("""
MERGE (tt:TeacherTimetable {uuid_string: $id})
SET tt.teacher_timetable_id = $id,
tt.start_date = date($start),
tt.end_date = date($end),
tt.node_storage_path = $path
""",
id=tt_neo4j_id,
start=str(tt["start_date"]),
end=str(tt["end_date"]),
path=f"timetable/{tt_neo4j_id}",
)
counts["teacher_timetables"] += 1
if teacher_uuid:
s.run("""
MATCH (t:Teacher {uuid_string: $tu})
MATCH (tt:TeacherTimetable {uuid_string: $id})
MERGE (t)-[:HAS_TIMETABLE]->(tt)
""", tu=teacher_uuid, id=tt_neo4j_id)
for slot in all_slots.get(tt["id"], []):
slot_neo4j_id = (
slot.get("neo4j_node_id")
or f"slot_{tt_neo4j_id}_{slot['day_of_week']}_{slot['period_code']}_{slot.get('week_cycle', '')}"
)
s.run("""
MERGE (sl:TimetableSlot {uuid_string: $id})
SET sl.day_of_week = $day, sl.period_code = $code,
sl.subject_class = $cls, sl.start_time = $start,
sl.end_time = $end, sl.week_cycle = $wc,
sl.node_storage_path = $path
WITH sl MATCH (tt:TeacherTimetable {uuid_string: $tt_id})
MERGE (tt)-[:HAS_TIMETABLE_SLOT]->(sl)
""",
id=slot_neo4j_id,
day=slot["day_of_week"],
code=slot["period_code"],
cls=slot.get("subject_class", ""),
start=slot.get("start_time", ""),
end=slot.get("end_time", ""),
wc=slot.get("week_cycle", ""),
path=f"timetable/{slot_neo4j_id}",
tt_id=tt_neo4j_id,
)
counts["slots"] += 1
return counts
def _sync_taught_lessons_to_neo4j(
institute_id: str,
institute_db: str,
sb: SupabaseServiceRoleClient,
) -> int:
"""
Create/update TaughtLesson Neo4j nodes from Supabase taught_lessons.
Links each node to its AcademicPeriod and TeacherTimetable. Safe to re-run.
Returns count of lessons merged.
"""
lessons = (
sb.supabase.table("taught_lessons")
.select("id,neo4j_node_id,academic_period_id,teacher_id,date,period_code,week_cycle,day_of_week,status")
.eq("institute_id", institute_id)
.execute()
.data or []
)
if not lessons:
return 0
# academic_period UUID → neo4j_node_id
period_ids = list({l["academic_period_id"] for l in lessons if l.get("academic_period_id")})
period_neo4j_map: Dict[str, str] = {}
for i in range(0, len(period_ids), 100):
chunk = period_ids[i : i + 100]
prows = (
sb.supabase.table("academic_periods")
.select("id,neo4j_node_id")
.in_("id", chunk)
.execute()
.data or []
)
for p in prows:
if p.get("neo4j_node_id"):
period_neo4j_map[p["id"]] = p["neo4j_node_id"]
# teacher profile_id → teacher_timetable neo4j_node_id
teacher_ids = list({l["teacher_id"] for l in lessons if l.get("teacher_id")})
teacher_tt_map: Dict[str, str] = {}
for i in range(0, len(teacher_ids), 100):
chunk = teacher_ids[i : i + 100]
ttrows = (
sb.supabase.table("teacher_timetables")
.select("profile_id,neo4j_node_id")
.eq("institute_id", institute_id)
.in_("profile_id", chunk)
.execute()
.data or []
)
for tt in ttrows:
if tt.get("neo4j_node_id"):
teacher_tt_map[tt["profile_id"]] = tt["neo4j_node_id"]
count = 0
with driver_tools.get_session(database=institute_db) as s:
for lesson in lessons:
tl_id = lesson.get("neo4j_node_id") or f"tl_{lesson['id']}"
ap_id = period_neo4j_map.get(lesson.get("academic_period_id", ""))
tt_id = teacher_tt_map.get(lesson.get("teacher_id", ""))
s.run("""
MERGE (tl:TaughtLesson {uuid_string: $id})
SET tl.date = date($date), tl.period_code = $pcode,
tl.week_cycle = $wc, tl.day_of_week = $dow,
tl.status = $status,
tl.node_storage_path = $path
""",
id=tl_id,
date=str(lesson["date"]),
pcode=lesson["period_code"],
wc=lesson.get("week_cycle", ""),
dow=lesson.get("day_of_week", ""),
status=lesson.get("status", "planned"),
path=f"taught_lessons/{tl_id}",
)
count += 1
if ap_id:
s.run("""
MATCH (ap:AcademicPeriod {uuid_string: $ap_id})
MATCH (tl:TaughtLesson {uuid_string: $tl_id})
MERGE (ap)-[:ACADEMIC_PERIOD_HAS_TAUGHT_LESSON]->(tl)
""", ap_id=ap_id, tl_id=tl_id)
if tt_id:
s.run("""
MATCH (tt:TeacherTimetable {uuid_string: $tt_id})
MATCH (tl:TaughtLesson {uuid_string: $tl_id})
MERGE (tt)-[:TEACHER_TIMETABLE_HAS_TAUGHT_LESSON]->(tl)
""", tt_id=tt_id, tl_id=tl_id)
return count
def _write_neo4j_direct(
institute_db: str,
school_tt_id: str,
ay_id: str,
body: TimetableSetupRequest,
year_start: date,
year_label: str,
teacher_uuid: Optional[str],
teacher_tt_id: Optional[str],
) -> None:
"""Fallback: write Neo4j directly when no Supabase institute_id is available."""
with driver_tools.get_session(database=institute_db) as s:
s.run("""
MERGE (tt:SchoolTimetable {uuid_string: $id})
SET tt.school_timetable_id = $id,
tt.start_date = date($start), tt.end_date = date($end),
tt.node_storage_path = $path, tt.periods_template = $periods
WITH tt MATCH (sch:School) MERGE (sch)-[:HAS_TIMETABLE]->(tt)
""",
id=school_tt_id,
start=body.year_start,
end=body.year_end,
path=f"timetable/{school_tt_id}",
periods=json.dumps([p.dict() for p in body.periods]),
)
s.run("""
MERGE (ay:AcademicYear {uuid_string: $id})
SET ay.year = $year, ay.node_storage_path = $path
WITH ay MATCH (tt:SchoolTimetable {uuid_string: $tt_id})
MERGE (tt)-[:ACADEMIC_TIMETABLE_HAS_ACADEMIC_YEAR]->(ay)
""",
id=ay_id, year=year_label, path=f"timetable/{ay_id}", tt_id=school_tt_id,
)
for term in body.terms:
t_id = f"term_{year_start.year}_{term.term_number}"
t_start = _iso_date(term.start_date)
t_end = _iso_date(term.end_date)
s.run("""
MERGE (t:AcademicTerm {uuid_string: $id})
SET t.term_name = $name, t.term_number = $num,
t.start_date = date($start), t.end_date = date($end),
t.node_storage_path = $path
WITH t MATCH (ay:AcademicYear {uuid_string: $ay_id})
MERGE (ay)-[:ACADEMIC_YEAR_HAS_ACADEMIC_TERM]->(t)
""",
id=t_id, name=term.name, num=str(term.term_number),
start=term.start_date, end=term.end_date,
path=f"timetable/{t_id}", ay_id=ay_id,
)
for wn, wstart in _academic_weeks(t_start, t_end):
w_id = f"week_{t_id}_{wn}"
s.run("""
MERGE (w:AcademicWeek {uuid_string: $id})
SET w.academic_week_number = $num, w.start_date = date($start),
w.week_type = 'academic', w.week_cycle = $cycle,
w.node_storage_path = $path
WITH w MATCH (t:AcademicTerm {uuid_string: $t_id})
MERGE (t)-[:ACADEMIC_TERM_HAS_ACADEMIC_WEEK]->(w)
""",
id=w_id, num=str(wn), start=wstart.isoformat(),
cycle="A" if wn % 2 == 1 else "B",
path=f"timetable/{w_id}", t_id=t_id,
)
if teacher_uuid and teacher_tt_id:
s.run("""
MERGE (tt:TeacherTimetable {uuid_string: $id})
SET tt.teacher_timetable_id = $id, tt.start_date = date($start),
tt.end_date = date($end), tt.node_storage_path = $path
WITH tt MATCH (teacher:Teacher {uuid_string: $t_uuid})
MERGE (teacher)-[:HAS_TIMETABLE]->(tt)
WITH tt MATCH (st:SchoolTimetable {uuid_string: $st_id})
MERGE (tt)-[:TEACHER_TIMETABLE_FOR]->(st)
""",
id=teacher_tt_id, start=body.year_start, end=body.year_end,
path=f"timetable/{teacher_tt_id}",
t_uuid=teacher_uuid, st_id=school_tt_id,
)
# ─── Endpoints ────────────────────────────────────────────────────────────────
@router.get("/status")
async def get_timetable_status(credentials: dict = Depends(SupabaseBearer())) -> Dict[str, Any]:
"""Check whether the teacher has a timetable set up."""
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
_, institute_db, teacher_uuid = _resolve_institute(user_id, user_email)
if not institute_db:
return {"status": "no_school"}
try:
with driver_tools.get_session(database=institute_db) as s:
has_tt = (
s.run(
"MATCH (t:Teacher {uuid_string: $u})-[:HAS_TIMETABLE]->(tt:TeacherTimetable) "
"RETURN tt.uuid_string AS id LIMIT 1",
u=teacher_uuid,
).single()
if teacher_uuid
else None
)
has_school_tt = s.run(
"MATCH (st:SchoolTimetable) RETURN st.uuid_string AS id LIMIT 1"
).single()
return {
"status": "ok",
"has_teacher_timetable": has_tt is not None,
"timetable_id": has_tt["id"] if has_tt else None,
"has_school_timetable": has_school_tt is not None,
"institute_db": institute_db,
}
except Exception as e:
return {"status": "error", "message": str(e)}
@router.post("/setup")
async def setup_timetable(
body: TimetableSetupRequest,
credentials: dict = Depends(SupabaseBearer()),
) -> Dict[str, Any]:
"""
Create school timetable structure.
Writes to Supabase first (source of truth), then builds Neo4j from Supabase.
Accepts optional days[] for AcademicDay nodes; auto-generates MonFri if omitted.
Accepts optional term_breaks[] and week_cycles[] overrides.
"""
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
institute_id, institute_db, teacher_uuid = _resolve_institute(user_id, user_email)
if not institute_db:
return {"status": "error", "message": "Institute database not found"}
year_start = _iso_date(body.year_start)
year_end = _iso_date(body.year_end)
year_label = f"{year_start.year}-{year_end.year}"
school_tt_id = f"school_tt_{year_start.year}_{year_end.year}"
ay_id = f"academic_year_{year_start.year}_{year_end.year}"
teacher_tt_id = f"teacher_tt_{teacher_uuid}_{year_start.year}" if teacher_uuid else None
days_by_date: Dict[str, DayInput] = {}
if body.days:
for d in body.days:
days_by_date[d.date] = d
week_cycles_map: Optional[Dict[Tuple[int, int], str]] = None
if body.week_cycles:
week_cycles_map = {(wc.term_number, wc.week_number): wc.cycle for wc in body.week_cycles}
# ── 1. Supabase writes ────────────────────────────────────────────────────
stt_sb_id: Optional[str] = None
if not institute_id:
logger.warning("No Supabase institute_id — skipping Supabase writes, falling back to direct Neo4j")
else:
try:
sb = _sb()
stt_sb_id = _sb_upsert_timetable(
sb, institute_id, year_start, year_end, year_label, school_tt_id, body.periods
)
ay_sb_id = _sb_upsert_academic_year(sb, stt_sb_id, institute_id, year_label, ay_id)
_sb_upsert_terms_weeks_days(
sb, ay_sb_id, institute_id, body.terms, year_start, days_by_date, week_cycles_map
)
if body.term_breaks:
_sb_upsert_term_breaks(sb, institute_id, stt_sb_id, body.term_breaks)
if teacher_uuid and teacher_tt_id:
_sb_upsert_teacher_timetable(
sb, user_id, institute_id, stt_sb_id, year_start, year_end, teacher_tt_id
)
logger.info(f"Supabase writes complete for institute {institute_id}")
except Exception as e:
logger.error(f"Supabase write failed: {e}")
return {"status": "error", "message": f"Supabase write failed: {e}"}
# ── 2. Build Neo4j ────────────────────────────────────────────────────────
if institute_id and stt_sb_id:
try:
counts = _build_neo4j_from_supabase(institute_id, institute_db)
logger.info(f"Neo4j build complete from Supabase: {counts}")
except Exception as e:
logger.error(f"Neo4j build from Supabase failed: {e}")
return {"status": "error", "message": f"Neo4j build failed: {e}"}
else:
try:
_write_neo4j_direct(
institute_db, school_tt_id, ay_id, body,
year_start, year_label, teacher_uuid, teacher_tt_id,
)
except Exception as e:
logger.error(f"Neo4j direct write failed: {e}")
return {"status": "error", "message": str(e)}
return {
"status": "ok",
"timetable_id": teacher_tt_id,
"school_timetable_id": school_tt_id,
"institute_db": institute_db,
}
@router.post("/rebuild-neo4j")
async def rebuild_neo4j(credentials: dict = Depends(SupabaseBearer())) -> Dict[str, Any]:
"""Rebuild all Neo4j timetable nodes from Supabase source of truth."""
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
institute_id, institute_db, _ = _resolve_institute(user_id, user_email)
if not institute_id or not institute_db:
return {"status": "error", "message": "Could not resolve institute"}
try:
counts = _build_neo4j_from_supabase(institute_id, institute_db)
return {"status": "ok", "rebuilt": counts, "institute_db": institute_db}
except Exception as e:
logger.error(f"Neo4j rebuild failed: {e}")
return {"status": "error", "message": str(e)}
@router.post("/materialize-periods")
async def materialize_periods(credentials: dict = Depends(SupabaseBearer())) -> Dict[str, Any]:
"""
Materialize academic_periods rows from academic_days × periods_template.
Creates one row per period per academic day (day_type='Academic'),
respecting excluded_period_codes per day. Safe to re-run (upserts).
"""
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
institute_id, _, _ = _resolve_institute(user_id, user_email)
if not institute_id:
return {"status": "error", "message": "Could not resolve institute"}
sb = _sb()
stt_rows = (
sb.supabase.table("school_timetables")
.select("id,periods_template")
.eq("institute_id", institute_id)
.limit(1)
.execute()
.data or []
)
if not stt_rows:
return {"status": "error", "message": "No school timetable found for this institute"}
stt = stt_rows[0]
periods_template: List[Dict] = stt.get("periods_template") or []
if not periods_template:
return {"status": "error", "message": "No periods_template defined on school timetable"}
days = (
sb.supabase.table("academic_days")
.select("id,neo4j_node_id,day_type,excluded_period_codes")
.eq("institute_id", institute_id)
.eq("day_type", "Academic")
.execute()
.data or []
)
created = 0
skipped = 0
errors = 0
for day in days:
excluded = set(day.get("excluded_period_codes") or [])
for p in periods_template:
if p.get("code") in excluded:
skipped += 1
continue
p_neo4j_id = f"{day['neo4j_node_id']}_{p['code']}"
try:
sb.supabase.table("academic_periods").upsert(
{
"academic_day_id": day["id"],
"institute_id": institute_id,
"period_code": p["code"],
"period_name": p["name"],
"period_type": p.get("period_type", "lesson"),
"start_time": p["start_time"],
"end_time": p["end_time"],
"neo4j_node_id": p_neo4j_id,
},
on_conflict="academic_day_id,period_code",
).execute()
created += 1
except Exception as e:
logger.error(f"Period upsert failed for day {day['id']} period {p['code']}: {e}")
errors += 1
logger.info(f"Materialized {created} periods, skipped {skipped}, errors {errors} for institute {institute_id}")
return {
"status": "ok",
"created": created,
"skipped": skipped,
"errors": errors,
"academic_days": len(days),
}
@router.get("/slots")
async def get_timetable_slots(credentials: dict = Depends(SupabaseBearer())) -> Dict[str, Any]:
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
institute_id, institute_db, _ = _resolve_institute(user_id, user_email)
if not institute_db:
return {"status": "no_school", "slots": [], "periods": []}
try:
sb = _sb()
tt_rows = (
sb.supabase.table("teacher_timetables")
.select("id,neo4j_node_id")
.eq("profile_id", user_id)
.limit(1)
.execute()
.data or []
)
if not tt_rows:
return {"status": "empty", "slots": [], "periods": []}
tt_id = tt_rows[0]["id"]
tt_neo4j_id = tt_rows[0].get("neo4j_node_id", "")
slot_rows = (
sb.supabase.table("teacher_timetable_slots")
.select("day_of_week,period_code,subject_class,start_time,end_time,week_cycle")
.eq("teacher_timetable_id", tt_id)
.execute()
.data or []
)
st_rows = (
sb.supabase.table("school_timetables")
.select("periods_template")
.eq("institute_id", institute_id)
.limit(1)
.execute()
.data or []
)
periods = (st_rows[0].get("periods_template") or []) if st_rows else []
return {
"status": "ok",
"timetable_id": tt_neo4j_id or str(tt_id),
"slots": slot_rows,
"periods": periods,
"institute_db": institute_db,
}
except Exception as e:
logger.error(f"Get timetable slots failed: {e}")
return {"status": "error", "slots": [], "periods": []}
@router.post("/slots")
async def save_timetable_slots(
body: SlotsRequest,
credentials: dict = Depends(SupabaseBearer()),
) -> Dict[str, Any]:
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
institute_id, institute_db, _ = _resolve_institute(user_id, user_email)
if not institute_db:
return {"status": "error", "message": "Teacher not linked to school"}
# ── 1. Supabase write (primary — fail hard if this fails) ────────────────
sb = _sb()
tt_sb = (
sb.supabase.table("teacher_timetables")
.select("id,neo4j_node_id")
.eq("profile_id", user_id)
.limit(1)
.execute()
)
tt_sb_row = (tt_sb.data or [{}])[0]
tt_sb_id = tt_sb_row.get("id")
if not tt_sb_id:
return {"status": "error", "message": "No teacher timetable found — run /timetable/init first"}
# Resolve class names → class UUIDs (best-effort; None if not found)
subject_names = list({s.subject_class.strip() for s in body.slots if s.subject_class.strip()})
class_name_map: Dict[str, str] = {}
if subject_names and institute_id:
try:
classes = (
sb.supabase.table("classes")
.select("id,name,class_code")
.eq("institute_id", institute_id)
.eq("is_active", True)
.execute()
.data or []
)
for c in classes:
if c.get("name"):
class_name_map[c["name"]] = c["id"]
if c.get("class_code"):
class_name_map[c["class_code"]] = c["id"]
except Exception as e:
logger.warning(f"Class name resolution failed (non-fatal): {e}")
# Full replace: delete existing then insert
sb.supabase.table("teacher_timetable_slots").delete().eq(
"teacher_timetable_id", tt_sb_id
).execute()
slot_rows = [
{
"teacher_timetable_id": tt_sb_id,
"profile_id": user_id,
"institute_id": institute_id,
"day_of_week": slot.day_of_week,
"period_code": slot.period_code,
"subject_class": slot.subject_class.strip(),
"start_time": slot.start_time,
"end_time": slot.end_time,
"week_cycle": slot.week_cycle,
"class_id": class_name_map.get(slot.subject_class.strip()),
"neo4j_node_id": f"slot_{body.timetable_id}_{slot.day_of_week}_{slot.period_code}_{slot.week_cycle}",
}
for slot in body.slots
if slot.subject_class.strip()
]
if slot_rows:
sb.supabase.table("teacher_timetable_slots").insert(slot_rows).execute()
logger.info(f"Saved {len(slot_rows)} slots to Supabase for timetable {tt_sb_id}")
# ── 2. Neo4j write ────────────────────────────────────────────────────────
try:
with driver_tools.get_session(database=institute_db) as s:
s.run(
"MATCH (:TeacherTimetable {uuid_string: $id})-[:HAS_TIMETABLE_SLOT]->(sl) DETACH DELETE sl",
id=body.timetable_id,
)
created = 0
for slot in body.slots:
if not slot.subject_class.strip():
continue
slot_id = f"slot_{body.timetable_id}_{slot.day_of_week}_{slot.period_code}_{slot.week_cycle}"
s.run("""
MERGE (sl:TimetableSlot {uuid_string: $id})
SET sl.day_of_week = $day, sl.period_code = $code,
sl.subject_class = $cls, sl.start_time = $start,
sl.end_time = $end, sl.week_cycle = $wc,
sl.node_storage_path = $path
WITH sl MATCH (tt:TeacherTimetable {uuid_string: $tt_id})
MERGE (tt)-[:HAS_TIMETABLE_SLOT]->(sl)
""",
id=slot_id, day=slot.day_of_week, code=slot.period_code,
cls=slot.subject_class, start=slot.start_time, end=slot.end_time,
wc=slot.week_cycle, path=f"timetable/slots/{slot_id}",
tt_id=body.timetable_id,
)
created += 1
return {"status": "ok", "created": created}
except Exception as e:
logger.error(f"Save timetable slots failed: {e}")
return {"status": "error", "message": str(e)}
@router.post("/init")
async def init_teacher_timetable(credentials: dict = Depends(SupabaseBearer())) -> Dict[str, Any]:
"""Create a TeacherTimetable for the current teacher from the existing school calendar."""
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
institute_id, institute_db, teacher_uuid = _resolve_institute(user_id, user_email)
if not institute_db:
return {"status": "error", "message": "Teacher not linked to a school"}
try:
with driver_tools.get_session(database=institute_db) as s:
st_rec = s.run(
"MATCH (st:SchoolTimetable) "
"RETURN st.uuid_string AS id, "
" toString(st.start_date) AS start, toString(st.end_date) AS end "
"LIMIT 1"
).single()
if not st_rec:
return {"status": "error", "message": "No school calendar set up yet — contact your school admin"}
school_tt_id = st_rec["id"]
start_str = str(st_rec["start"])
end_str = str(st_rec["end"])
year_str = start_str[:4]
teacher_tt_id = f"teacher_tt_{teacher_uuid}_{year_str}"
s.run("""
MERGE (tt:TeacherTimetable {uuid_string: $id})
SET tt.teacher_timetable_id = $id, tt.start_date = date($start),
tt.end_date = date($end), tt.node_storage_path = $path
WITH tt MATCH (teacher:Teacher {uuid_string: $t_uuid})
MERGE (teacher)-[:HAS_TIMETABLE]->(tt)
WITH tt MATCH (st:SchoolTimetable {uuid_string: $st_id})
MERGE (tt)-[:TEACHER_TIMETABLE_FOR]->(st)
""",
id=teacher_tt_id, start=start_str, end=end_str,
path=f"timetable/{teacher_tt_id}",
t_uuid=teacher_uuid, st_id=school_tt_id,
)
# Supabase teacher_timetable record (best-effort)
if institute_id:
try:
sb = _sb()
stt_sb = (
sb.supabase.table("school_timetables")
.select("id")
.eq("institute_id", institute_id)
.limit(1)
.execute()
)
stt_sb_id = (stt_sb.data or [{}])[0].get("id")
if stt_sb_id:
sb.supabase.table("teacher_timetables").upsert(
{
"profile_id": user_id,
"institute_id": institute_id,
"school_timetable_id": stt_sb_id,
"start_date": start_str,
"end_date": end_str,
"neo4j_node_id": teacher_tt_id,
},
on_conflict="profile_id,school_timetable_id",
).execute()
except Exception as e:
logger.warning(f"Supabase teacher_timetable upsert (non-fatal): {e}")
return {"status": "ok", "timetable_id": teacher_tt_id}
except Exception as e:
logger.error(f"Teacher timetable init failed: {e}")
return {"status": "error", "message": str(e)}
@router.post("/sync-lessons")
async def sync_taught_lessons(credentials: dict = Depends(SupabaseBearer())) -> Dict[str, Any]:
"""
Sync TaughtLesson Neo4j nodes from Supabase for the caller's school.
Also rebuilds TeacherTimetable and TimetableSlot nodes.
Safe to re-run (all MERGEs).
"""
user_id = credentials.get("sub", "")
user_email = credentials.get("email", "")
institute_id, institute_db, _ = _resolve_institute(user_id, user_email)
if not institute_db:
return {"status": "error", "message": "No school found"}
try:
sb = _sb()
tt_counts = _sync_teacher_timetables_to_neo4j(institute_id, institute_db, sb)
lesson_count = _sync_taught_lessons_to_neo4j(institute_id, institute_db, sb)
return {
"status": "ok",
"teacher_timetables": tt_counts["teacher_timetables"],
"slots": tt_counts["slots"],
"taught_lessons": lesson_count,
}
except Exception as e:
logger.error(f"sync_taught_lessons failed: {e}")
return {"status": "error", "message": str(e)}