api/modules/database/services/exam_projection.py
CC Worker 9c1aee28e2
Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled
feat(exam): persist S4-9 region kinds + Part geometry; keep metadata out of graph
Backend follow-on to migration 73:
- schemas: ResponseAreaPayload.kind extended to response|context|question_number|
  mark_area|reference|furniture + context_type; QuestionPayload gains bounds+page.
- PUT serialization persists Part bounds/page and region context_type.
- Neo4j projection only emits Region nodes for response/context regions; the
  metadata kinds (question_number/mark_area/reference/furniture) are physical-layer
  only and stay out of cc.public.exams.
- Unit test: new kinds + Part geometry + context_type round-trip.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 21:14:20 +00:00

169 lines
8.3 KiB
Python

"""Project a saved exam template into the cc.public.exams Neo4j graph (card S4-7).
Supabase is source of truth for the operational template (geometry, marks); cc.public.exams is
the co-primary knowledge graph (spec §2/S2). On template save the structural skeleton —
ExamPaper → Question/Part → Region, plus Part-[:ASSESSES]->SpecPoint — is projected here.
Ownership model (R3.5.1): the graph is written by the API SERVICE ROLE only (no client writes),
so this task reads the template via service role and writes Neo4j with the system driver. It is
the sanctioned service-role path (documented in the ADR), distinct from the as-user request path.
Join keys (never regenerated):
exam_questions.id -> Question.uuid_string (container) | Part.uuid_string (leaf)
exam_response_areas.id -> Region.uuid_string
eb_exams.exam_code -> ExamPaper.exam_code
eb_specifications.spec_code -> Specification.spec_code
Projection is a full re-sync per exam_code (delete this paper's Question/Part/Region, recreate),
matching the PUT full-replace semantics — idempotent and safe to re-run.
"""
import os
import uuid
from typing import Any, Dict, List, Optional
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.tools.neo4j_driver_tools import get_session
from modules.logger_tool import initialise_logger
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
# MUST match run/initialization/init_exam_graph.py (shared DB name + deterministic uuid namespace).
EXAM_DB = "cc.public.exams"
NS = uuid.UUID("00000000-0000-0000-0000-00000000e8a1")
def _uid(*parts: str) -> str:
return str(uuid.uuid5(NS, ":".join(parts)))
def _rows(result: Any) -> List[Dict[str, Any]]:
data = getattr(result, "data", None)
if not data:
return []
return data if isinstance(data, list) else [data]
def project_template(template_id: str) -> Dict[str, Any]:
"""Read the template (service role) and (re)project its structure into cc.public.exams.
Returns a counts dict. Raises on hard failure (caller decides whether to swallow — a
BackgroundTask logs and drops; the manual /neo4j-sync endpoint surfaces the error).
"""
sb = SupabaseServiceRoleClient().supabase
template = (sb.table("exam_templates").select("*").eq("id", template_id).limit(1).execute().data or [None])[0]
if not template:
raise ValueError(f"template {template_id} not found")
questions = _rows(sb.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute())
regions = _rows(sb.table("exam_response_areas").select("*").eq("template_id", template_id).execute())
# Resolve the paper's exam_code + spec metadata. Catalogue paper → from eb_exams; ad-hoc upload
# (no exam_code) → a stable synthetic code so the paper still has a unique graph key.
exam_code = template.get("exam_code")
spec_code = None
paper_meta: Dict[str, Any] = {}
if template.get("exam_id"):
eb = (sb.table("eb_exams").select("exam_code, spec_code, paper_code, tier, session")
.eq("id", template["exam_id"]).limit(1).execute().data or [None])[0]
if eb:
exam_code = exam_code or eb.get("exam_code")
spec_code = eb.get("spec_code")
paper_meta = eb
if not exam_code:
exam_code = f"tpl:{template_id}"
paper_uid = _uid("ExamPaper", exam_code)
counts = {"exam_code": exam_code, "questions": 0, "parts": 0, "regions": 0, "assesses": 0, "spec_linked": False}
with get_session(database=EXAM_DB) as s:
# 1. Clean this paper's existing children (full re-sync), keep the ExamPaper node itself.
s.run("MATCH (r:Region {exam_code:$ec}) DETACH DELETE r", ec=exam_code).consume()
s.run("MATCH (n {exam_code:$ec}) WHERE n:Question OR n:Part DETACH DELETE n", ec=exam_code).consume()
# 2. ExamPaper node.
s.run(
"MERGE (p:ExamPaper {uuid_string:$uid}) "
"SET p.exam_code=$ec, p.spec_code=$sc, p.title=$title, p.page_count=$pc, "
" p.paper_code=$paper_code, p.tier=$tier, p.session=$session, p.node_storage_path=$nsp",
uid=paper_uid, ec=exam_code, sc=spec_code, title=template.get("title"),
pc=template.get("page_count"), paper_code=paper_meta.get("paper_code"),
tier=paper_meta.get("tier"), session=paper_meta.get("session"),
nsp=f"{EXAM_DB}/ExamPaper/{exam_code}",
).consume()
# 3. Link to its Specification (seeded separately) when known.
if spec_code:
r = s.run(
"MATCH (sp:Specification {spec_code:$sc}), (p:ExamPaper {exam_code:$ec}) "
"MERGE (sp)-[:HAS_PAPER]->(p) RETURN count(*) AS n",
sc=spec_code, ec=exam_code,
).single()
counts["spec_linked"] = bool(r and r["n"])
# 4. Question/Part nodes — pass 1: create all nodes (so parents exist before linking).
for q in questions:
label = "Question" if q.get("is_container") else "Part"
s.run(
f"MERGE (n:{label} {{uuid_string:$uid}}) "
"SET n.exam_code=$ec, n.label=$label, n.order=$order, n.max_marks=$mm, "
" n.answer_type=$at, n.mark_scheme_type=$mst, n.spec_ref=$sref, "
" n.node_storage_path=$nsp",
uid=q["id"], ec=exam_code, label=q.get("label"), order=q.get("order") or 0,
mm=q.get("max_marks") or 0, at=q.get("answer_type"),
mst=(q.get("mark_scheme") or {}).get("type") if isinstance(q.get("mark_scheme"), dict) else None,
sref=q.get("spec_ref"), nsp=f"{EXAM_DB}/{label}/{q['id']}",
).consume()
counts["parts" if label == "Part" else "questions"] += 1
# 5. Structural + ASSESSES edges — pass 2.
for q in questions:
if q.get("parent_id"):
s.run(
"MATCH (parent {uuid_string:$pid}), (n {uuid_string:$uid}) MERGE (parent)-[:HAS_PART]->(n)",
pid=q["parent_id"], uid=q["id"],
).consume()
else:
s.run(
"MATCH (p:ExamPaper {exam_code:$ec}), (n {uuid_string:$uid}) MERGE (p)-[:HAS_QUESTION]->(n)",
ec=exam_code, uid=q["id"],
).consume()
if q.get("spec_ref"):
# SpecPoints are seeded per spec_code; match within this paper's spec when known.
r = s.run(
"MATCH (n {uuid_string:$uid}), (sp:SpecPoint {ref:$ref}) "
+ ("WHERE sp.spec_code=$sc " if spec_code else "")
+ "MERGE (n)-[:ASSESSES]->(sp) RETURN count(*) AS n",
uid=q["id"], ref=q["spec_ref"], sc=spec_code,
).single()
counts["assesses"] += (r["n"] if r else 0)
# 6. Region nodes + HAS_REGION edges.
# Only response/context regions are part of the knowledge graph (RegionNode.kind). The other
# S4-9 kinds (question_number, mark_area, reference, furniture) are physical-layer metadata
# about the paper, not curriculum structure — they stay in Supabase, out of cc.public.exams.
for rg in regions:
if rg.get("kind") not in ("response", "context"):
continue
s.run(
"MERGE (r:Region {uuid_string:$uid}) "
"SET r.exam_code=$ec, r.page=$page, r.kind=$kind, r.response_form=$rf, r.node_storage_path=$nsp",
uid=rg["id"], ec=exam_code, page=rg.get("page"), kind=rg.get("kind"),
rf=rg.get("response_form"), nsp=f"{EXAM_DB}/Region/{rg['id']}",
).consume()
s.run(
"MATCH (q {uuid_string:$qid}), (r:Region {uuid_string:$uid}) MERGE (q)-[:HAS_REGION]->(r)",
qid=rg["question_id"], uid=rg["id"],
).consume()
counts["regions"] += 1
logger.info(f"Projected template {template_id} → cc.public.exams: {counts}")
return counts
def project_template_safe(template_id: str) -> None:
"""BackgroundTask wrapper: never raises (a failed projection must not break the HTTP save)."""
try:
project_template(template_id)
except Exception as exc:
logger.error(f"Background Neo4j projection failed for template {template_id}: {exc}")