feat(exam): Neo4j projection on template save + neo4j-sync (S4-7)

modules/database/services/exam_projection.py projects a saved template into
cc.public.exams: ExamPaper -> Question/Part -> Region + Part-[:ASSESSES]->
SpecPoint, joined by shared UUIDs (exam_questions.id, exam_response_areas.id,
exam_code, spec_code). Full re-sync per exam_code (idempotent). Reads via
service role + writes via system Neo4j driver (R3.5.1 documented graph-writer).

Wiring (R3.5.4/R5.3):
- PUT /templates/{id} enqueues project_template_safe via BackgroundTasks
  (swallows failures so a graph hiccup never fails the canvas save).
- POST /templates/{id}/neo4j-sync — manual trigger, as-user auth + owner check,
  runs synchronously and returns projection counts.

Unit tests: projection scheduled on PUT; neo4j-sync owner/403/404.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
CC Worker 2026-06-06 19:02:18 +00:00
parent 98be55ab57
commit 77bb0766ff
3 changed files with 227 additions and 1 deletions

View File

@ -0,0 +1,163 @@
"""Project a saved exam template into the cc.public.exams Neo4j graph (card S4-7).
Supabase is source of truth for the operational template (geometry, marks); cc.public.exams is
the co-primary knowledge graph (spec §2/S2). On template save the structural skeleton
ExamPaper Question/Part Region, plus Part-[:ASSESSES]->SpecPoint is projected here.
Ownership model (R3.5.1): the graph is written by the API SERVICE ROLE only (no client writes),
so this task reads the template via service role and writes Neo4j with the system driver. It is
the sanctioned service-role path (documented in the ADR), distinct from the as-user request path.
Join keys (never regenerated):
exam_questions.id -> Question.uuid_string (container) | Part.uuid_string (leaf)
exam_response_areas.id -> Region.uuid_string
eb_exams.exam_code -> ExamPaper.exam_code
eb_specifications.spec_code -> Specification.spec_code
Projection is a full re-sync per exam_code (delete this paper's Question/Part/Region, recreate),
matching the PUT full-replace semantics idempotent and safe to re-run.
"""
import os
import uuid
from typing import Any, Dict, List, Optional
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.tools.neo4j_driver_tools import get_session
from modules.logger_tool import initialise_logger
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
# MUST match run/initialization/init_exam_graph.py (shared DB name + deterministic uuid namespace).
EXAM_DB = "cc.public.exams"
NS = uuid.UUID("00000000-0000-0000-0000-00000000e8a1")
def _uid(*parts: str) -> str:
return str(uuid.uuid5(NS, ":".join(parts)))
def _rows(result: Any) -> List[Dict[str, Any]]:
data = getattr(result, "data", None)
if not data:
return []
return data if isinstance(data, list) else [data]
def project_template(template_id: str) -> Dict[str, Any]:
"""Read the template (service role) and (re)project its structure into cc.public.exams.
Returns a counts dict. Raises on hard failure (caller decides whether to swallow a
BackgroundTask logs and drops; the manual /neo4j-sync endpoint surfaces the error).
"""
sb = SupabaseServiceRoleClient().supabase
template = (sb.table("exam_templates").select("*").eq("id", template_id).limit(1).execute().data or [None])[0]
if not template:
raise ValueError(f"template {template_id} not found")
questions = _rows(sb.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute())
regions = _rows(sb.table("exam_response_areas").select("*").eq("template_id", template_id).execute())
# Resolve the paper's exam_code + spec metadata. Catalogue paper → from eb_exams; ad-hoc upload
# (no exam_code) → a stable synthetic code so the paper still has a unique graph key.
exam_code = template.get("exam_code")
spec_code = None
paper_meta: Dict[str, Any] = {}
if template.get("exam_id"):
eb = (sb.table("eb_exams").select("exam_code, spec_code, paper_code, tier, session")
.eq("id", template["exam_id"]).limit(1).execute().data or [None])[0]
if eb:
exam_code = exam_code or eb.get("exam_code")
spec_code = eb.get("spec_code")
paper_meta = eb
if not exam_code:
exam_code = f"tpl:{template_id}"
paper_uid = _uid("ExamPaper", exam_code)
counts = {"exam_code": exam_code, "questions": 0, "parts": 0, "regions": 0, "assesses": 0, "spec_linked": False}
with get_session(database=EXAM_DB) as s:
# 1. Clean this paper's existing children (full re-sync), keep the ExamPaper node itself.
s.run("MATCH (r:Region {exam_code:$ec}) DETACH DELETE r", ec=exam_code).consume()
s.run("MATCH (n {exam_code:$ec}) WHERE n:Question OR n:Part DETACH DELETE n", ec=exam_code).consume()
# 2. ExamPaper node.
s.run(
"MERGE (p:ExamPaper {uuid_string:$uid}) "
"SET p.exam_code=$ec, p.spec_code=$sc, p.title=$title, p.page_count=$pc, "
" p.paper_code=$paper_code, p.tier=$tier, p.session=$session, p.node_storage_path=$nsp",
uid=paper_uid, ec=exam_code, sc=spec_code, title=template.get("title"),
pc=template.get("page_count"), paper_code=paper_meta.get("paper_code"),
tier=paper_meta.get("tier"), session=paper_meta.get("session"),
nsp=f"{EXAM_DB}/ExamPaper/{exam_code}",
).consume()
# 3. Link to its Specification (seeded separately) when known.
if spec_code:
r = s.run(
"MATCH (sp:Specification {spec_code:$sc}), (p:ExamPaper {exam_code:$ec}) "
"MERGE (sp)-[:HAS_PAPER]->(p) RETURN count(*) AS n",
sc=spec_code, ec=exam_code,
).single()
counts["spec_linked"] = bool(r and r["n"])
# 4. Question/Part nodes — pass 1: create all nodes (so parents exist before linking).
for q in questions:
label = "Question" if q.get("is_container") else "Part"
s.run(
f"MERGE (n:{label} {{uuid_string:$uid}}) "
"SET n.exam_code=$ec, n.label=$label, n.order=$order, n.max_marks=$mm, "
" n.answer_type=$at, n.mark_scheme_type=$mst, n.spec_ref=$sref, "
" n.node_storage_path=$nsp",
uid=q["id"], ec=exam_code, label=q.get("label"), order=q.get("order") or 0,
mm=q.get("max_marks") or 0, at=q.get("answer_type"),
mst=(q.get("mark_scheme") or {}).get("type") if isinstance(q.get("mark_scheme"), dict) else None,
sref=q.get("spec_ref"), nsp=f"{EXAM_DB}/{label}/{q['id']}",
).consume()
counts["parts" if label == "Part" else "questions"] += 1
# 5. Structural + ASSESSES edges — pass 2.
for q in questions:
if q.get("parent_id"):
s.run(
"MATCH (parent {uuid_string:$pid}), (n {uuid_string:$uid}) MERGE (parent)-[:HAS_PART]->(n)",
pid=q["parent_id"], uid=q["id"],
).consume()
else:
s.run(
"MATCH (p:ExamPaper {exam_code:$ec}), (n {uuid_string:$uid}) MERGE (p)-[:HAS_QUESTION]->(n)",
ec=exam_code, uid=q["id"],
).consume()
if q.get("spec_ref"):
# SpecPoints are seeded per spec_code; match within this paper's spec when known.
r = s.run(
"MATCH (n {uuid_string:$uid}), (sp:SpecPoint {ref:$ref}) "
+ ("WHERE sp.spec_code=$sc " if spec_code else "")
+ "MERGE (n)-[:ASSESSES]->(sp) RETURN count(*) AS n",
uid=q["id"], ref=q["spec_ref"], sc=spec_code,
).single()
counts["assesses"] += (r["n"] if r else 0)
# 6. Region nodes + HAS_REGION edges.
for rg in regions:
s.run(
"MERGE (r:Region {uuid_string:$uid}) "
"SET r.exam_code=$ec, r.page=$page, r.kind=$kind, r.response_form=$rf, r.node_storage_path=$nsp",
uid=rg["id"], ec=exam_code, page=rg.get("page"), kind=rg.get("kind"),
rf=rg.get("response_form"), nsp=f"{EXAM_DB}/Region/{rg['id']}",
).consume()
s.run(
"MATCH (q {uuid_string:$qid}), (r:Region {uuid_string:$uid}) MERGE (q)-[:HAS_REGION]->(r)",
qid=rg["question_id"], uid=rg["id"],
).consume()
counts["regions"] += 1
logger.info(f"Projected template {template_id} → cc.public.exams: {counts}")
return counts
def project_template_safe(template_id: str) -> None:
"""BackgroundTask wrapper: never raises (a failed projection must not break the HTTP save)."""
try:
project_template(template_id)
except Exception as exc:
logger.error(f"Background Neo4j projection failed for template {template_id}: {exc}")

View File

@ -15,8 +15,9 @@ from __future__ import annotations
import os
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from modules.database.services.exam_projection import project_template, project_template_safe
from modules.logger_tool import initialise_logger
from routers.exam.dependencies import ExamContext, get_exam_context, lookup_exam_code
from routers.exam.schemas import (
@ -137,6 +138,7 @@ async def get_template(
async def replace_template(
template_id: str,
body: TemplateReplaceRequest,
background_tasks: BackgroundTasks,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
"""Full-replace canvas save (R5.2). Replaces questions/response_areas/boundaries wholesale.
@ -223,6 +225,10 @@ async def replace_template(
f"Exam template {template_id} replaced: {len(body.questions)} questions, "
f"{len(body.response_areas)} regions, {len(body.boundaries)} boundaries"
)
# R3.5.4: a successful save enqueues a graph projection into cc.public.exams. BackgroundTasks
# is acceptable for Sprint 4 (durability via a real queue is a later step); failures are
# swallowed so the canvas save itself never fails on a graph hiccup.
background_tasks.add_task(project_template_safe, template_id)
return await get_template(template_id, ctx)
@ -238,6 +244,24 @@ async def archive_template(
return {"status": "archived", "id": template_id}
@router.post("/templates/{template_id}/neo4j-sync")
async def neo4j_sync(
template_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
"""Manual graph-projection trigger (R5.3) for dev/backfill — runs synchronously and returns
counts. Auth/ownership is checked as-the-user; the projection itself uses service role
(R3.5.1, the documented graph-writer path)."""
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
try:
counts = project_template(template_id)
except Exception as exc:
logger.error(f"Manual neo4j-sync failed for template {template_id}: {exc}")
raise HTTPException(status_code=502, detail=f"Projection failed: {exc}")
return {"status": "ok", "projection": counts}
# ─── questions (granular edit path, R5.2) ────────────────────────────────────
@router.patch("/questions/{question_id}")

View File

@ -5,9 +5,11 @@ ExamContext dependency is overridden with an in-memory fake, so these tests exer
router's auth/ownership/institute logic without a live Supabase — the as-user RLS itself is
verified separately against .94 (see the evidence note).
"""
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
import routers.exam.templates as templates_mod
from routers.exam.templates import router
from routers.exam.dependencies import ExamContext, get_exam_context
@ -18,6 +20,15 @@ INST_A = "10000000-0000-0000-0000-000000000001"
INST_B = "10000000-0000-0000-0000-000000000002"
@pytest.fixture(autouse=True)
def _stub_projection(monkeypatch):
"""Record projection scheduling and never touch Neo4j/service-role in unit tests."""
calls = []
monkeypatch.setattr(templates_mod, "project_template_safe", lambda tid: calls.append(tid))
monkeypatch.setattr(templates_mod, "project_template", lambda tid: {"exam_code": "X", "questions": 1})
return calls
# ─── in-memory fake supabase ─────────────────────────────────────────────────
class FakeResult:
@ -262,3 +273,31 @@ def test_patch_question_empty_body_is_400():
store = {"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01"}]}
client, _ = make_client(store=store)
assert client.patch("/api/exam/questions/q1", json={}).status_code == 400
# ─── Neo4j projection (S4-7) ─────────────────────────────────────────────────
def test_put_schedules_projection(_stub_projection):
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
client, _ = make_client(store=store)
client.put("/api/exam/templates/t1", json={"questions": []})
assert _stub_projection == ["t1"] # projection enqueued for the saved template
def test_neo4j_sync_owner_runs():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
client, _ = make_client(store=store)
r = client.post("/api/exam/templates/t1/neo4j-sync")
assert r.status_code == 200
assert r.json()["projection"]["exam_code"] == "X"
def test_neo4j_sync_non_owner_403():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]}
client, _ = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store)
assert client.post("/api/exam/templates/t1/neo4j-sync").status_code == 403
def test_neo4j_sync_404():
client, _ = make_client(store={"exam_templates": []})
assert client.post("/api/exam/templates/does-not-exist/neo4j-sync").status_code == 404