From 77bb0766ffe9416ed4327ee1307c85b3c7eb61e9 Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sat, 6 Jun 2026 19:02:18 +0000 Subject: [PATCH] feat(exam): Neo4j projection on template save + neo4j-sync (S4-7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit modules/database/services/exam_projection.py projects a saved template into cc.public.exams: ExamPaper -> Question/Part -> Region + Part-[:ASSESSES]-> SpecPoint, joined by shared UUIDs (exam_questions.id, exam_response_areas.id, exam_code, spec_code). Full re-sync per exam_code (idempotent). Reads via service role + writes via system Neo4j driver (R3.5.1 documented graph-writer). Wiring (R3.5.4/R5.3): - PUT /templates/{id} enqueues project_template_safe via BackgroundTasks (swallows failures so a graph hiccup never fails the canvas save). - POST /templates/{id}/neo4j-sync — manual trigger, as-user auth + owner check, runs synchronously and returns projection counts. Unit tests: projection scheduled on PUT; neo4j-sync owner/403/404. Co-Authored-By: Claude Opus 4.8 --- modules/database/services/exam_projection.py | 163 +++++++++++++++++++ routers/exam/templates.py | 26 ++- tests/test_exam_templates.py | 39 +++++ 3 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 modules/database/services/exam_projection.py diff --git a/modules/database/services/exam_projection.py b/modules/database/services/exam_projection.py new file mode 100644 index 0000000..2f5d969 --- /dev/null +++ b/modules/database/services/exam_projection.py @@ -0,0 +1,163 @@ +"""Project a saved exam template into the cc.public.exams Neo4j graph (card S4-7). + +Supabase is source of truth for the operational template (geometry, marks); cc.public.exams is +the co-primary knowledge graph (spec §2/S2). On template save the structural skeleton — +ExamPaper → Question/Part → Region, plus Part-[:ASSESSES]->SpecPoint — is projected here. + +Ownership model (R3.5.1): the graph is written by the API SERVICE ROLE only (no client writes), +so this task reads the template via service role and writes Neo4j with the system driver. It is +the sanctioned service-role path (documented in the ADR), distinct from the as-user request path. + +Join keys (never regenerated): + exam_questions.id -> Question.uuid_string (container) | Part.uuid_string (leaf) + exam_response_areas.id -> Region.uuid_string + eb_exams.exam_code -> ExamPaper.exam_code + eb_specifications.spec_code -> Specification.spec_code + +Projection is a full re-sync per exam_code (delete this paper's Question/Part/Region, recreate), +matching the PUT full-replace semantics — idempotent and safe to re-run. +""" +import os +import uuid +from typing import Any, Dict, List, Optional + +from modules.database.supabase.utils.client import SupabaseServiceRoleClient +from modules.database.tools.neo4j_driver_tools import get_session +from modules.logger_tool import initialise_logger + +logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) + +# MUST match run/initialization/init_exam_graph.py (shared DB name + deterministic uuid namespace). +EXAM_DB = "cc.public.exams" +NS = uuid.UUID("00000000-0000-0000-0000-00000000e8a1") + + +def _uid(*parts: str) -> str: + return str(uuid.uuid5(NS, ":".join(parts))) + + +def _rows(result: Any) -> List[Dict[str, Any]]: + data = getattr(result, "data", None) + if not data: + return [] + return data if isinstance(data, list) else [data] + + +def project_template(template_id: str) -> Dict[str, Any]: + """Read the template (service role) and (re)project its structure into cc.public.exams. + + Returns a counts dict. Raises on hard failure (caller decides whether to swallow — a + BackgroundTask logs and drops; the manual /neo4j-sync endpoint surfaces the error). + """ + sb = SupabaseServiceRoleClient().supabase + template = (sb.table("exam_templates").select("*").eq("id", template_id).limit(1).execute().data or [None])[0] + if not template: + raise ValueError(f"template {template_id} not found") + + questions = _rows(sb.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute()) + regions = _rows(sb.table("exam_response_areas").select("*").eq("template_id", template_id).execute()) + + # Resolve the paper's exam_code + spec metadata. Catalogue paper → from eb_exams; ad-hoc upload + # (no exam_code) → a stable synthetic code so the paper still has a unique graph key. + exam_code = template.get("exam_code") + spec_code = None + paper_meta: Dict[str, Any] = {} + if template.get("exam_id"): + eb = (sb.table("eb_exams").select("exam_code, spec_code, paper_code, tier, session") + .eq("id", template["exam_id"]).limit(1).execute().data or [None])[0] + if eb: + exam_code = exam_code or eb.get("exam_code") + spec_code = eb.get("spec_code") + paper_meta = eb + if not exam_code: + exam_code = f"tpl:{template_id}" + + paper_uid = _uid("ExamPaper", exam_code) + counts = {"exam_code": exam_code, "questions": 0, "parts": 0, "regions": 0, "assesses": 0, "spec_linked": False} + + with get_session(database=EXAM_DB) as s: + # 1. Clean this paper's existing children (full re-sync), keep the ExamPaper node itself. + s.run("MATCH (r:Region {exam_code:$ec}) DETACH DELETE r", ec=exam_code).consume() + s.run("MATCH (n {exam_code:$ec}) WHERE n:Question OR n:Part DETACH DELETE n", ec=exam_code).consume() + + # 2. ExamPaper node. + s.run( + "MERGE (p:ExamPaper {uuid_string:$uid}) " + "SET p.exam_code=$ec, p.spec_code=$sc, p.title=$title, p.page_count=$pc, " + " p.paper_code=$paper_code, p.tier=$tier, p.session=$session, p.node_storage_path=$nsp", + uid=paper_uid, ec=exam_code, sc=spec_code, title=template.get("title"), + pc=template.get("page_count"), paper_code=paper_meta.get("paper_code"), + tier=paper_meta.get("tier"), session=paper_meta.get("session"), + nsp=f"{EXAM_DB}/ExamPaper/{exam_code}", + ).consume() + + # 3. Link to its Specification (seeded separately) when known. + if spec_code: + r = s.run( + "MATCH (sp:Specification {spec_code:$sc}), (p:ExamPaper {exam_code:$ec}) " + "MERGE (sp)-[:HAS_PAPER]->(p) RETURN count(*) AS n", + sc=spec_code, ec=exam_code, + ).single() + counts["spec_linked"] = bool(r and r["n"]) + + # 4. Question/Part nodes — pass 1: create all nodes (so parents exist before linking). + for q in questions: + label = "Question" if q.get("is_container") else "Part" + s.run( + f"MERGE (n:{label} {{uuid_string:$uid}}) " + "SET n.exam_code=$ec, n.label=$label, n.order=$order, n.max_marks=$mm, " + " n.answer_type=$at, n.mark_scheme_type=$mst, n.spec_ref=$sref, " + " n.node_storage_path=$nsp", + uid=q["id"], ec=exam_code, label=q.get("label"), order=q.get("order") or 0, + mm=q.get("max_marks") or 0, at=q.get("answer_type"), + mst=(q.get("mark_scheme") or {}).get("type") if isinstance(q.get("mark_scheme"), dict) else None, + sref=q.get("spec_ref"), nsp=f"{EXAM_DB}/{label}/{q['id']}", + ).consume() + counts["parts" if label == "Part" else "questions"] += 1 + + # 5. Structural + ASSESSES edges — pass 2. + for q in questions: + if q.get("parent_id"): + s.run( + "MATCH (parent {uuid_string:$pid}), (n {uuid_string:$uid}) MERGE (parent)-[:HAS_PART]->(n)", + pid=q["parent_id"], uid=q["id"], + ).consume() + else: + s.run( + "MATCH (p:ExamPaper {exam_code:$ec}), (n {uuid_string:$uid}) MERGE (p)-[:HAS_QUESTION]->(n)", + ec=exam_code, uid=q["id"], + ).consume() + if q.get("spec_ref"): + # SpecPoints are seeded per spec_code; match within this paper's spec when known. + r = s.run( + "MATCH (n {uuid_string:$uid}), (sp:SpecPoint {ref:$ref}) " + + ("WHERE sp.spec_code=$sc " if spec_code else "") + + "MERGE (n)-[:ASSESSES]->(sp) RETURN count(*) AS n", + uid=q["id"], ref=q["spec_ref"], sc=spec_code, + ).single() + counts["assesses"] += (r["n"] if r else 0) + + # 6. Region nodes + HAS_REGION edges. + for rg in regions: + s.run( + "MERGE (r:Region {uuid_string:$uid}) " + "SET r.exam_code=$ec, r.page=$page, r.kind=$kind, r.response_form=$rf, r.node_storage_path=$nsp", + uid=rg["id"], ec=exam_code, page=rg.get("page"), kind=rg.get("kind"), + rf=rg.get("response_form"), nsp=f"{EXAM_DB}/Region/{rg['id']}", + ).consume() + s.run( + "MATCH (q {uuid_string:$qid}), (r:Region {uuid_string:$uid}) MERGE (q)-[:HAS_REGION]->(r)", + qid=rg["question_id"], uid=rg["id"], + ).consume() + counts["regions"] += 1 + + logger.info(f"Projected template {template_id} → cc.public.exams: {counts}") + return counts + + +def project_template_safe(template_id: str) -> None: + """BackgroundTask wrapper: never raises (a failed projection must not break the HTTP save).""" + try: + project_template(template_id) + except Exception as exc: + logger.error(f"Background Neo4j projection failed for template {template_id}: {exc}") diff --git a/routers/exam/templates.py b/routers/exam/templates.py index 6c58657..d4ebfee 100644 --- a/routers/exam/templates.py +++ b/routers/exam/templates.py @@ -15,8 +15,9 @@ from __future__ import annotations import os from typing import Any, Dict, List, Optional -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException +from modules.database.services.exam_projection import project_template, project_template_safe from modules.logger_tool import initialise_logger from routers.exam.dependencies import ExamContext, get_exam_context, lookup_exam_code from routers.exam.schemas import ( @@ -137,6 +138,7 @@ async def get_template( async def replace_template( template_id: str, body: TemplateReplaceRequest, + background_tasks: BackgroundTasks, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: """Full-replace canvas save (R5.2). Replaces questions/response_areas/boundaries wholesale. @@ -223,6 +225,10 @@ async def replace_template( f"Exam template {template_id} replaced: {len(body.questions)} questions, " f"{len(body.response_areas)} regions, {len(body.boundaries)} boundaries" ) + # R3.5.4: a successful save enqueues a graph projection into cc.public.exams. BackgroundTasks + # is acceptable for Sprint 4 (durability via a real queue is a later step); failures are + # swallowed so the canvas save itself never fails on a graph hiccup. + background_tasks.add_task(project_template_safe, template_id) return await get_template(template_id, ctx) @@ -238,6 +244,24 @@ async def archive_template( return {"status": "archived", "id": template_id} +@router.post("/templates/{template_id}/neo4j-sync") +async def neo4j_sync( + template_id: str, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + """Manual graph-projection trigger (R5.3) for dev/backfill — runs synchronously and returns + counts. Auth/ownership is checked as-the-user; the projection itself uses service role + (R3.5.1, the documented graph-writer path).""" + template = _fetch_template_or_404(ctx, template_id) + _require_owner(ctx, template) + try: + counts = project_template(template_id) + except Exception as exc: + logger.error(f"Manual neo4j-sync failed for template {template_id}: {exc}") + raise HTTPException(status_code=502, detail=f"Projection failed: {exc}") + return {"status": "ok", "projection": counts} + + # ─── questions (granular edit path, R5.2) ──────────────────────────────────── @router.patch("/questions/{question_id}") diff --git a/tests/test_exam_templates.py b/tests/test_exam_templates.py index ea4fa95..96bfb60 100644 --- a/tests/test_exam_templates.py +++ b/tests/test_exam_templates.py @@ -5,9 +5,11 @@ ExamContext dependency is overridden with an in-memory fake, so these tests exer router's auth/ownership/institute logic without a live Supabase — the as-user RLS itself is verified separately against .94 (see the evidence note). """ +import pytest from fastapi import FastAPI from fastapi.testclient import TestClient +import routers.exam.templates as templates_mod from routers.exam.templates import router from routers.exam.dependencies import ExamContext, get_exam_context @@ -18,6 +20,15 @@ INST_A = "10000000-0000-0000-0000-000000000001" INST_B = "10000000-0000-0000-0000-000000000002" +@pytest.fixture(autouse=True) +def _stub_projection(monkeypatch): + """Record projection scheduling and never touch Neo4j/service-role in unit tests.""" + calls = [] + monkeypatch.setattr(templates_mod, "project_template_safe", lambda tid: calls.append(tid)) + monkeypatch.setattr(templates_mod, "project_template", lambda tid: {"exam_code": "X", "questions": 1}) + return calls + + # ─── in-memory fake supabase ───────────────────────────────────────────────── class FakeResult: @@ -262,3 +273,31 @@ def test_patch_question_empty_body_is_400(): store = {"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01"}]} client, _ = make_client(store=store) assert client.patch("/api/exam/questions/q1", json={}).status_code == 400 + + +# ─── Neo4j projection (S4-7) ───────────────────────────────────────────────── + +def test_put_schedules_projection(_stub_projection): + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]} + client, _ = make_client(store=store) + client.put("/api/exam/templates/t1", json={"questions": []}) + assert _stub_projection == ["t1"] # projection enqueued for the saved template + + +def test_neo4j_sync_owner_runs(): + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]} + client, _ = make_client(store=store) + r = client.post("/api/exam/templates/t1/neo4j-sync") + assert r.status_code == 200 + assert r.json()["projection"]["exam_code"] == "X" + + +def test_neo4j_sync_non_owner_403(): + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]} + client, _ = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store) + assert client.post("/api/exam/templates/t1/neo4j-sync").status_code == 403 + + +def test_neo4j_sync_404(): + client, _ = make_client(store={"exam_templates": []}) + assert client.post("/api/exam/templates/does-not-exist/neo4j-sync").status_code == 404