Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled
Backend follow-on to migration 73: - schemas: ResponseAreaPayload.kind extended to response|context|question_number| mark_area|reference|furniture + context_type; QuestionPayload gains bounds+page. - PUT serialization persists Part bounds/page and region context_type. - Neo4j projection only emits Region nodes for response/context regions; the metadata kinds (question_number/mark_area/reference/furniture) are physical-layer only and stay out of cc.public.exams. - Unit test: new kinds + Part geometry + context_type round-trip. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
313 lines
13 KiB
Python
313 lines
13 KiB
Python
"""Template CRUD for the exam-marker (/api/exam/templates...) — card S4-5.
|
|
|
|
All access is as-the-user (RLS-enforced; spec E1 fix) via ExamContext. Ownership is also
|
|
checked explicitly before mutating (E2: never trust a client-supplied id as authorization) —
|
|
defence in depth on top of RLS. A row the caller cannot see under RLS reads back as absent,
|
|
so cross-institute access surfaces as 404, never a data leak (IDOR-safe).
|
|
|
|
Hybrid persistence (R5.2): PUT /templates/{id} is a full-replace of the canvas children
|
|
(questions + response areas + boundaries); PATCH /questions/{qid} is the granular mark-scheme
|
|
edit path. Client-supplied UUIDs are preserved so Supabase ids stay aligned with the Neo4j
|
|
join keys (spec §2).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
|
|
|
|
from modules.database.services.exam_projection import project_template, project_template_safe
|
|
from modules.logger_tool import initialise_logger
|
|
from routers.exam.dependencies import ExamContext, get_exam_context, lookup_exam_code
|
|
from routers.exam.schemas import (
|
|
CreateTemplateRequest,
|
|
PatchQuestionRequest,
|
|
TemplateReplaceRequest,
|
|
)
|
|
|
|
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ─── helpers ─────────────────────────────────────────────────────────────────
|
|
|
|
def _rows(result: Any) -> List[Dict[str, Any]]:
|
|
data = getattr(result, "data", None)
|
|
if not data:
|
|
return []
|
|
return data if isinstance(data, list) else [data]
|
|
|
|
|
|
def _first(result: Any) -> Optional[Dict[str, Any]]:
|
|
rows = _rows(result)
|
|
return rows[0] if rows else None
|
|
|
|
|
|
def _fetch_template_or_404(ctx: ExamContext, template_id: str) -> Dict[str, Any]:
|
|
"""Load a template the caller can see (RLS-scoped). Missing/forbidden → 404."""
|
|
res = ctx.supabase.table("exam_templates").select("*").eq("id", template_id).limit(1).execute()
|
|
row = _first(res)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Template not found")
|
|
return row
|
|
|
|
|
|
def _require_owner(ctx: ExamContext, template: Dict[str, Any]) -> None:
|
|
"""Writes are limited to the owning teacher (R2.4). RLS also enforces this; we pre-check
|
|
so a colleague who can *read* the template gets a clean 403 instead of a silent no-op."""
|
|
if template.get("teacher_id") != ctx.user_id:
|
|
raise HTTPException(status_code=403, detail="Only the template owner can modify it")
|
|
|
|
|
|
def _template_has_recorded_marks(ctx: ExamContext, template_id: str) -> bool:
|
|
"""True if any mark_entry exists for a batch of this template (→ destructive PUT is unsafe)."""
|
|
batches = _rows(
|
|
ctx.supabase.table("marking_batches").select("id").eq("template_id", template_id).execute()
|
|
)
|
|
batch_ids = [b["id"] for b in batches]
|
|
if not batch_ids:
|
|
return False
|
|
marks = _rows(
|
|
ctx.supabase.table("mark_entries").select("id").in_("batch_id", batch_ids).limit(1).execute()
|
|
)
|
|
return bool(marks)
|
|
|
|
|
|
# ─── templates ───────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/templates")
|
|
async def create_template(
|
|
body: CreateTemplateRequest,
|
|
ctx: ExamContext = Depends(get_exam_context),
|
|
) -> Dict[str, Any]:
|
|
institute_id = ctx.resolve_institute(body.institute_id)
|
|
|
|
exam_code = body.exam_code
|
|
if body.exam_id and not exam_code:
|
|
exam_code = lookup_exam_code(body.exam_id)
|
|
|
|
row = {
|
|
"title": body.title,
|
|
"subject": body.subject,
|
|
"exam_id": body.exam_id,
|
|
"exam_code": exam_code,
|
|
"source_file_id": body.source_file_id,
|
|
"page_count": body.page_count,
|
|
"institute_id": institute_id,
|
|
"teacher_id": ctx.user_id,
|
|
"status": "draft",
|
|
}
|
|
row = {k: v for k, v in row.items() if v is not None}
|
|
|
|
res = ctx.supabase.table("exam_templates").insert(row).execute()
|
|
created = _first(res)
|
|
if not created:
|
|
raise HTTPException(status_code=500, detail="Failed to create template")
|
|
logger.info(f"Exam template created: {created.get('id')} by {ctx.user_id}")
|
|
return created
|
|
|
|
|
|
@router.get("/templates")
|
|
async def list_templates(
|
|
include_archived: bool = False,
|
|
institute_id: Optional[str] = None,
|
|
ctx: ExamContext = Depends(get_exam_context),
|
|
) -> Dict[str, Any]:
|
|
# RLS already scopes to the caller's institutes; the optional filter narrows within that.
|
|
q = ctx.supabase.table("exam_templates").select("*")
|
|
if institute_id:
|
|
q = q.eq("institute_id", institute_id)
|
|
if not include_archived:
|
|
q = q.neq("status", "archived")
|
|
res = q.order("updated_at", desc=True).execute()
|
|
return {"templates": _rows(res)}
|
|
|
|
|
|
@router.get("/templates/{template_id}")
|
|
async def get_template(
|
|
template_id: str,
|
|
ctx: ExamContext = Depends(get_exam_context),
|
|
) -> Dict[str, Any]:
|
|
template = _fetch_template_or_404(ctx, template_id)
|
|
questions = _rows(
|
|
ctx.supabase.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute()
|
|
)
|
|
response_areas = _rows(
|
|
ctx.supabase.table("exam_response_areas").select("*").eq("template_id", template_id).execute()
|
|
)
|
|
boundaries = _rows(
|
|
ctx.supabase.table("exam_boundaries").select("*").eq("template_id", template_id).execute()
|
|
)
|
|
return {
|
|
**template,
|
|
"questions": questions,
|
|
"response_areas": response_areas,
|
|
"boundaries": boundaries,
|
|
}
|
|
|
|
|
|
@router.put("/templates/{template_id}")
|
|
async def replace_template(
|
|
template_id: str,
|
|
body: TemplateReplaceRequest,
|
|
background_tasks: BackgroundTasks,
|
|
ctx: ExamContext = Depends(get_exam_context),
|
|
) -> Dict[str, Any]:
|
|
"""Full-replace canvas save (R5.2). Replaces questions/response_areas/boundaries wholesale.
|
|
|
|
Note: the delete-then-insert spans several PostgREST calls and is therefore not atomic;
|
|
acceptable for the small (~20-question) payloads this carries. A transactional RPC is a
|
|
later hardening step if concurrent canvas saves become a concern.
|
|
"""
|
|
template = _fetch_template_or_404(ctx, template_id)
|
|
_require_owner(ctx, template)
|
|
|
|
# Data-loss guard: the wholesale question delete below cascades to mark_entries
|
|
# (mark_entries.question_id → exam_questions ON DELETE CASCADE). Refuse a structural
|
|
# full-replace once any marks have been recorded against this template's batches, so
|
|
# re-saving the setup canvas mid-marking can't silently wipe a teacher's marking work.
|
|
# (Mark-scheme tweaks use PATCH /questions/{id}, which is unaffected.)
|
|
if _template_has_recorded_marks(ctx, template_id):
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="Template has recorded marks; structural full-replace is blocked. "
|
|
"Edit questions individually via PATCH /questions/{id}.",
|
|
)
|
|
|
|
# Optional template-level metadata update alongside the canvas.
|
|
if body.meta:
|
|
updates = {k: v for k, v in body.meta.dict().items() if v is not None}
|
|
if updates:
|
|
ctx.supabase.table("exam_templates").update(updates).eq("id", template_id).execute()
|
|
|
|
sb = ctx.supabase
|
|
# Clear existing children. Order matters: response_areas/boundaries reference questions, so
|
|
# remove them first (we delete by template_id rather than rely on cascade for predictability).
|
|
sb.table("exam_response_areas").delete().eq("template_id", template_id).execute()
|
|
sb.table("exam_boundaries").delete().eq("template_id", template_id).execute()
|
|
sb.table("exam_questions").delete().eq("template_id", template_id).execute()
|
|
|
|
# Re-insert, preserving client-supplied UUIDs (Neo4j join keys, spec §2).
|
|
if body.questions:
|
|
q_rows = []
|
|
for q in body.questions:
|
|
r = {
|
|
"template_id": template_id,
|
|
"parent_id": q.parent_id,
|
|
"label": q.label,
|
|
"order": q.order,
|
|
"max_marks": q.max_marks,
|
|
"answer_type": q.answer_type,
|
|
"mcq_options": q.mcq_options,
|
|
"mark_scheme": q.mark_scheme,
|
|
"is_container": q.is_container,
|
|
"spec_ref": q.spec_ref,
|
|
"bounds": q.bounds, # drawn Part box (73); null for derived main questions
|
|
"page": q.page,
|
|
}
|
|
if q.id:
|
|
r["id"] = q.id
|
|
q_rows.append({k: v for k, v in r.items() if v is not None})
|
|
sb.table("exam_questions").insert(q_rows).execute()
|
|
|
|
if body.response_areas:
|
|
ra_rows = []
|
|
for ra in body.response_areas:
|
|
r = {
|
|
"template_id": template_id,
|
|
"question_id": ra.question_id,
|
|
"page": ra.page,
|
|
"bounds": ra.bounds,
|
|
"kind": ra.kind,
|
|
"response_form": ra.response_form,
|
|
"context_type": ra.context_type, # 73: optional Context differentiation
|
|
"source": ra.source,
|
|
"confirmed": ra.confirmed,
|
|
"confidence": ra.confidence,
|
|
}
|
|
if ra.id:
|
|
r["id"] = ra.id
|
|
ra_rows.append({k: v for k, v in r.items() if v is not None})
|
|
sb.table("exam_response_areas").insert(ra_rows).execute()
|
|
|
|
if body.boundaries:
|
|
b_rows = []
|
|
for b in body.boundaries:
|
|
r = {
|
|
"template_id": template_id,
|
|
"question_id": b.question_id,
|
|
"label": b.label,
|
|
"page_index": b.page_index,
|
|
"y": b.y,
|
|
"bounds": b.bounds,
|
|
"source": b.source,
|
|
"confirmed": b.confirmed,
|
|
}
|
|
if b.id:
|
|
r["id"] = b.id
|
|
b_rows.append({k: v for k, v in r.items() if v is not None})
|
|
sb.table("exam_boundaries").insert(b_rows).execute()
|
|
|
|
logger.info(
|
|
f"Exam template {template_id} replaced: {len(body.questions)} questions, "
|
|
f"{len(body.response_areas)} regions, {len(body.boundaries)} boundaries"
|
|
)
|
|
# R3.5.4: a successful save enqueues a graph projection into cc.public.exams. BackgroundTasks
|
|
# is acceptable for Sprint 4 (durability via a real queue is a later step); failures are
|
|
# swallowed so the canvas save itself never fails on a graph hiccup.
|
|
background_tasks.add_task(project_template_safe, template_id)
|
|
return await get_template(template_id, ctx)
|
|
|
|
|
|
@router.delete("/templates/{template_id}")
|
|
async def archive_template(
|
|
template_id: str,
|
|
ctx: ExamContext = Depends(get_exam_context),
|
|
) -> Dict[str, Any]:
|
|
"""Soft-delete: status='archived' (R5.2). Never hard-deletes a teacher's work."""
|
|
template = _fetch_template_or_404(ctx, template_id)
|
|
_require_owner(ctx, template)
|
|
ctx.supabase.table("exam_templates").update({"status": "archived"}).eq("id", template_id).execute()
|
|
return {"status": "archived", "id": template_id}
|
|
|
|
|
|
@router.post("/templates/{template_id}/neo4j-sync")
|
|
async def neo4j_sync(
|
|
template_id: str,
|
|
ctx: ExamContext = Depends(get_exam_context),
|
|
) -> Dict[str, Any]:
|
|
"""Manual graph-projection trigger (R5.3) for dev/backfill — runs synchronously and returns
|
|
counts. Auth/ownership is checked as-the-user; the projection itself uses service role
|
|
(R3.5.1, the documented graph-writer path)."""
|
|
template = _fetch_template_or_404(ctx, template_id)
|
|
_require_owner(ctx, template)
|
|
try:
|
|
counts = project_template(template_id)
|
|
except Exception as exc:
|
|
logger.error(f"Manual neo4j-sync failed for template {template_id}: {exc}")
|
|
raise HTTPException(status_code=502, detail=f"Projection failed: {exc}")
|
|
return {"status": "ok", "projection": counts}
|
|
|
|
|
|
# ─── questions (granular edit path, R5.2) ────────────────────────────────────
|
|
|
|
@router.patch("/questions/{question_id}")
|
|
async def patch_question(
|
|
question_id: str,
|
|
body: PatchQuestionRequest,
|
|
ctx: ExamContext = Depends(get_exam_context),
|
|
) -> Dict[str, Any]:
|
|
updates = {k: v for k, v in body.dict().items() if v is not None}
|
|
if not updates:
|
|
raise HTTPException(status_code=400, detail="No fields to update")
|
|
|
|
# RLS (exam_questions_all) enforces that the question belongs to a template owned by the
|
|
# caller; an out-of-scope id updates zero rows → 404, so no explicit pre-fetch is needed.
|
|
res = ctx.supabase.table("exam_questions").update(updates).eq("id", question_id).execute()
|
|
updated = _first(res)
|
|
if not updated:
|
|
raise HTTPException(status_code=404, detail="Question not found")
|
|
return updated
|