api/modules/database/services/exam_projection.py
CC Worker 77bb0766ff feat(exam): Neo4j projection on template save + neo4j-sync (S4-7)
modules/database/services/exam_projection.py projects a saved template into
cc.public.exams: ExamPaper -> Question/Part -> Region + Part-[:ASSESSES]->
SpecPoint, joined by shared UUIDs (exam_questions.id, exam_response_areas.id,
exam_code, spec_code). Full re-sync per exam_code (idempotent). Reads via
service role + writes via system Neo4j driver (R3.5.1 documented graph-writer).

Wiring (R3.5.4/R5.3):
- PUT /templates/{id} enqueues project_template_safe via BackgroundTasks
  (swallows failures so a graph hiccup never fails the canvas save).
- POST /templates/{id}/neo4j-sync — manual trigger, as-user auth + owner check,
  runs synchronously and returns projection counts.

Unit tests: projection scheduled on PUT; neo4j-sync owner/403/404.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 19:02:18 +00:00

164 lines
7.9 KiB
Python

"""Project a saved exam template into the cc.public.exams Neo4j graph (card S4-7).
Supabase is source of truth for the operational template (geometry, marks); cc.public.exams is
the co-primary knowledge graph (spec §2/S2). On template save the structural skeleton —
ExamPaper → Question/Part → Region, plus Part-[:ASSESSES]->SpecPoint — is projected here.
Ownership model (R3.5.1): the graph is written by the API SERVICE ROLE only (no client writes),
so this task reads the template via service role and writes Neo4j with the system driver. It is
the sanctioned service-role path (documented in the ADR), distinct from the as-user request path.
Join keys (never regenerated):
exam_questions.id -> Question.uuid_string (container) | Part.uuid_string (leaf)
exam_response_areas.id -> Region.uuid_string
eb_exams.exam_code -> ExamPaper.exam_code
eb_specifications.spec_code -> Specification.spec_code
Projection is a full re-sync per exam_code (delete this paper's Question/Part/Region, recreate),
matching the PUT full-replace semantics — idempotent and safe to re-run.
"""
import os
import uuid
from typing import Any, Dict, List, Optional
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.tools.neo4j_driver_tools import get_session
from modules.logger_tool import initialise_logger
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
# MUST match run/initialization/init_exam_graph.py (shared DB name + deterministic uuid namespace).
EXAM_DB = "cc.public.exams"
NS = uuid.UUID("00000000-0000-0000-0000-00000000e8a1")
def _uid(*parts: str) -> str:
return str(uuid.uuid5(NS, ":".join(parts)))
def _rows(result: Any) -> List[Dict[str, Any]]:
data = getattr(result, "data", None)
if not data:
return []
return data if isinstance(data, list) else [data]
def project_template(template_id: str) -> Dict[str, Any]:
"""Read the template (service role) and (re)project its structure into cc.public.exams.
Returns a counts dict. Raises on hard failure (caller decides whether to swallow — a
BackgroundTask logs and drops; the manual /neo4j-sync endpoint surfaces the error).
"""
sb = SupabaseServiceRoleClient().supabase
template = (sb.table("exam_templates").select("*").eq("id", template_id).limit(1).execute().data or [None])[0]
if not template:
raise ValueError(f"template {template_id} not found")
questions = _rows(sb.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute())
regions = _rows(sb.table("exam_response_areas").select("*").eq("template_id", template_id).execute())
# Resolve the paper's exam_code + spec metadata. Catalogue paper → from eb_exams; ad-hoc upload
# (no exam_code) → a stable synthetic code so the paper still has a unique graph key.
exam_code = template.get("exam_code")
spec_code = None
paper_meta: Dict[str, Any] = {}
if template.get("exam_id"):
eb = (sb.table("eb_exams").select("exam_code, spec_code, paper_code, tier, session")
.eq("id", template["exam_id"]).limit(1).execute().data or [None])[0]
if eb:
exam_code = exam_code or eb.get("exam_code")
spec_code = eb.get("spec_code")
paper_meta = eb
if not exam_code:
exam_code = f"tpl:{template_id}"
paper_uid = _uid("ExamPaper", exam_code)
counts = {"exam_code": exam_code, "questions": 0, "parts": 0, "regions": 0, "assesses": 0, "spec_linked": False}
with get_session(database=EXAM_DB) as s:
# 1. Clean this paper's existing children (full re-sync), keep the ExamPaper node itself.
s.run("MATCH (r:Region {exam_code:$ec}) DETACH DELETE r", ec=exam_code).consume()
s.run("MATCH (n {exam_code:$ec}) WHERE n:Question OR n:Part DETACH DELETE n", ec=exam_code).consume()
# 2. ExamPaper node.
s.run(
"MERGE (p:ExamPaper {uuid_string:$uid}) "
"SET p.exam_code=$ec, p.spec_code=$sc, p.title=$title, p.page_count=$pc, "
" p.paper_code=$paper_code, p.tier=$tier, p.session=$session, p.node_storage_path=$nsp",
uid=paper_uid, ec=exam_code, sc=spec_code, title=template.get("title"),
pc=template.get("page_count"), paper_code=paper_meta.get("paper_code"),
tier=paper_meta.get("tier"), session=paper_meta.get("session"),
nsp=f"{EXAM_DB}/ExamPaper/{exam_code}",
).consume()
# 3. Link to its Specification (seeded separately) when known.
if spec_code:
r = s.run(
"MATCH (sp:Specification {spec_code:$sc}), (p:ExamPaper {exam_code:$ec}) "
"MERGE (sp)-[:HAS_PAPER]->(p) RETURN count(*) AS n",
sc=spec_code, ec=exam_code,
).single()
counts["spec_linked"] = bool(r and r["n"])
# 4. Question/Part nodes — pass 1: create all nodes (so parents exist before linking).
for q in questions:
label = "Question" if q.get("is_container") else "Part"
s.run(
f"MERGE (n:{label} {{uuid_string:$uid}}) "
"SET n.exam_code=$ec, n.label=$label, n.order=$order, n.max_marks=$mm, "
" n.answer_type=$at, n.mark_scheme_type=$mst, n.spec_ref=$sref, "
" n.node_storage_path=$nsp",
uid=q["id"], ec=exam_code, label=q.get("label"), order=q.get("order") or 0,
mm=q.get("max_marks") or 0, at=q.get("answer_type"),
mst=(q.get("mark_scheme") or {}).get("type") if isinstance(q.get("mark_scheme"), dict) else None,
sref=q.get("spec_ref"), nsp=f"{EXAM_DB}/{label}/{q['id']}",
).consume()
counts["parts" if label == "Part" else "questions"] += 1
# 5. Structural + ASSESSES edges — pass 2.
for q in questions:
if q.get("parent_id"):
s.run(
"MATCH (parent {uuid_string:$pid}), (n {uuid_string:$uid}) MERGE (parent)-[:HAS_PART]->(n)",
pid=q["parent_id"], uid=q["id"],
).consume()
else:
s.run(
"MATCH (p:ExamPaper {exam_code:$ec}), (n {uuid_string:$uid}) MERGE (p)-[:HAS_QUESTION]->(n)",
ec=exam_code, uid=q["id"],
).consume()
if q.get("spec_ref"):
# SpecPoints are seeded per spec_code; match within this paper's spec when known.
r = s.run(
"MATCH (n {uuid_string:$uid}), (sp:SpecPoint {ref:$ref}) "
+ ("WHERE sp.spec_code=$sc " if spec_code else "")
+ "MERGE (n)-[:ASSESSES]->(sp) RETURN count(*) AS n",
uid=q["id"], ref=q["spec_ref"], sc=spec_code,
).single()
counts["assesses"] += (r["n"] if r else 0)
# 6. Region nodes + HAS_REGION edges.
for rg in regions:
s.run(
"MERGE (r:Region {uuid_string:$uid}) "
"SET r.exam_code=$ec, r.page=$page, r.kind=$kind, r.response_form=$rf, r.node_storage_path=$nsp",
uid=rg["id"], ec=exam_code, page=rg.get("page"), kind=rg.get("kind"),
rf=rg.get("response_form"), nsp=f"{EXAM_DB}/Region/{rg['id']}",
).consume()
s.run(
"MATCH (q {uuid_string:$qid}), (r:Region {uuid_string:$uid}) MERGE (q)-[:HAS_REGION]->(r)",
qid=rg["question_id"], uid=rg["id"],
).consume()
counts["regions"] += 1
logger.info(f"Projected template {template_id} → cc.public.exams: {counts}")
return counts
def project_template_safe(template_id: str) -> None:
"""BackgroundTask wrapper: never raises (a failed projection must not break the HTTP save)."""
try:
project_template(template_id)
except Exception as exc:
logger.error(f"Background Neo4j projection failed for template {template_id}: {exc}")