"""Project a saved exam template into the cc.public.exams Neo4j graph (card S4-7). Supabase is source of truth for the operational template (geometry, marks); cc.public.exams is the co-primary knowledge graph (spec §2/S2). On template save the structural skeleton — ExamPaper → Question/Part → Region, plus Part-[:ASSESSES]->SpecPoint — is projected here. Ownership model (R3.5.1): the graph is written by the API SERVICE ROLE only (no client writes), so this task reads the template via service role and writes Neo4j with the system driver. It is the sanctioned service-role path (documented in the ADR), distinct from the as-user request path. Join keys (never regenerated): exam_questions.id -> Question.uuid_string (container) | Part.uuid_string (leaf) exam_response_areas.id -> Region.uuid_string eb_exams.exam_code -> ExamPaper.exam_code eb_specifications.spec_code -> Specification.spec_code Projection is a full re-sync per exam_code (delete this paper's Question/Part/Region, recreate), matching the PUT full-replace semantics — idempotent and safe to re-run. """ import os import uuid from typing import Any, Dict, List, Optional from modules.database.supabase.utils.client import SupabaseServiceRoleClient from modules.database.tools.neo4j_driver_tools import get_session from modules.logger_tool import initialise_logger logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) # MUST match run/initialization/init_exam_graph.py (shared DB name + deterministic uuid namespace). EXAM_DB = "cc.public.exams" NS = uuid.UUID("00000000-0000-0000-0000-00000000e8a1") def _uid(*parts: str) -> str: return str(uuid.uuid5(NS, ":".join(parts))) def _rows(result: Any) -> List[Dict[str, Any]]: data = getattr(result, "data", None) if not data: return [] return data if isinstance(data, list) else [data] def project_template(template_id: str) -> Dict[str, Any]: """Read the template (service role) and (re)project its structure into cc.public.exams. Returns a counts dict. Raises on hard failure (caller decides whether to swallow — a BackgroundTask logs and drops; the manual /neo4j-sync endpoint surfaces the error). """ sb = SupabaseServiceRoleClient().supabase template = (sb.table("exam_templates").select("*").eq("id", template_id).limit(1).execute().data or [None])[0] if not template: raise ValueError(f"template {template_id} not found") questions = _rows(sb.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute()) regions = _rows(sb.table("exam_response_areas").select("*").eq("template_id", template_id).execute()) # Resolve the paper's exam_code + spec metadata. Catalogue paper → from eb_exams; ad-hoc upload # (no exam_code) → a stable synthetic code so the paper still has a unique graph key. exam_code = template.get("exam_code") spec_code = None paper_meta: Dict[str, Any] = {} if template.get("exam_id"): eb = (sb.table("eb_exams").select("exam_code, spec_code, paper_code, tier, session") .eq("id", template["exam_id"]).limit(1).execute().data or [None])[0] if eb: exam_code = exam_code or eb.get("exam_code") spec_code = eb.get("spec_code") paper_meta = eb if not exam_code: exam_code = f"tpl:{template_id}" paper_uid = _uid("ExamPaper", exam_code) counts = {"exam_code": exam_code, "questions": 0, "parts": 0, "regions": 0, "assesses": 0, "spec_linked": False} with get_session(database=EXAM_DB) as s: # 1. Clean this paper's existing children (full re-sync), keep the ExamPaper node itself. s.run("MATCH (r:Region {exam_code:$ec}) DETACH DELETE r", ec=exam_code).consume() s.run("MATCH (n {exam_code:$ec}) WHERE n:Question OR n:Part DETACH DELETE n", ec=exam_code).consume() # 2. ExamPaper node. s.run( "MERGE (p:ExamPaper {uuid_string:$uid}) " "SET p.exam_code=$ec, p.spec_code=$sc, p.title=$title, p.page_count=$pc, " " p.paper_code=$paper_code, p.tier=$tier, p.session=$session, p.node_storage_path=$nsp", uid=paper_uid, ec=exam_code, sc=spec_code, title=template.get("title"), pc=template.get("page_count"), paper_code=paper_meta.get("paper_code"), tier=paper_meta.get("tier"), session=paper_meta.get("session"), nsp=f"{EXAM_DB}/ExamPaper/{exam_code}", ).consume() # 3. Link to its Specification (seeded separately) when known. if spec_code: r = s.run( "MATCH (sp:Specification {spec_code:$sc}), (p:ExamPaper {exam_code:$ec}) " "MERGE (sp)-[:HAS_PAPER]->(p) RETURN count(*) AS n", sc=spec_code, ec=exam_code, ).single() counts["spec_linked"] = bool(r and r["n"]) # 4. Question/Part nodes — pass 1: create all nodes (so parents exist before linking). for q in questions: label = "Question" if q.get("is_container") else "Part" s.run( f"MERGE (n:{label} {{uuid_string:$uid}}) " "SET n.exam_code=$ec, n.label=$label, n.order=$order, n.max_marks=$mm, " " n.answer_type=$at, n.mark_scheme_type=$mst, n.spec_ref=$sref, " " n.node_storage_path=$nsp", uid=q["id"], ec=exam_code, label=q.get("label"), order=q.get("order") or 0, mm=q.get("max_marks") or 0, at=q.get("answer_type"), mst=(q.get("mark_scheme") or {}).get("type") if isinstance(q.get("mark_scheme"), dict) else None, sref=q.get("spec_ref"), nsp=f"{EXAM_DB}/{label}/{q['id']}", ).consume() counts["parts" if label == "Part" else "questions"] += 1 # 5. Structural + ASSESSES edges — pass 2. for q in questions: if q.get("parent_id"): s.run( "MATCH (parent {uuid_string:$pid}), (n {uuid_string:$uid}) MERGE (parent)-[:HAS_PART]->(n)", pid=q["parent_id"], uid=q["id"], ).consume() else: s.run( "MATCH (p:ExamPaper {exam_code:$ec}), (n {uuid_string:$uid}) MERGE (p)-[:HAS_QUESTION]->(n)", ec=exam_code, uid=q["id"], ).consume() if q.get("spec_ref"): # SpecPoints are seeded per spec_code; match within this paper's spec when known. r = s.run( "MATCH (n {uuid_string:$uid}), (sp:SpecPoint {ref:$ref}) " + ("WHERE sp.spec_code=$sc " if spec_code else "") + "MERGE (n)-[:ASSESSES]->(sp) RETURN count(*) AS n", uid=q["id"], ref=q["spec_ref"], sc=spec_code, ).single() counts["assesses"] += (r["n"] if r else 0) # 6. Region nodes + HAS_REGION edges. # Only response/context regions are part of the knowledge graph (RegionNode.kind). The other # S4-9 kinds (question_number, mark_area, reference, furniture) are physical-layer metadata # about the paper, not curriculum structure — they stay in Supabase, out of cc.public.exams. for rg in regions: if rg.get("kind") not in ("response", "context"): continue s.run( "MERGE (r:Region {uuid_string:$uid}) " "SET r.exam_code=$ec, r.page=$page, r.kind=$kind, r.response_form=$rf, r.node_storage_path=$nsp", uid=rg["id"], ec=exam_code, page=rg.get("page"), kind=rg.get("kind"), rf=rg.get("response_form"), nsp=f"{EXAM_DB}/Region/{rg['id']}", ).consume() s.run( "MATCH (q {uuid_string:$qid}), (r:Region {uuid_string:$uid}) MERGE (q)-[:HAS_REGION]->(r)", qid=rg["question_id"], uid=rg["id"], ).consume() counts["regions"] += 1 logger.info(f"Projected template {template_id} → cc.public.exams: {counts}") return counts def project_template_safe(template_id: str) -> None: """BackgroundTask wrapper: never raises (a failed projection must not break the HTTP save).""" try: project_template(template_id) except Exception as exc: logger.error(f"Background Neo4j projection failed for template {template_id}: {exc}")