api/routers/exam/templates.py
CC Worker c58df6715c feat(exam): template source PDF at create + GET /templates/{id}/source-pdf (S4-8.1)
Recovered from cc-worker WIP that was left uncommitted in the dev-centre clone
(card t_0055b89b). Multipart source_pdf upload at create -> source_file_id;
source-pdf download endpoint resolves from exam_id (catalogue) or source_file_id.
NOT yet human-reviewed/merged; preserving + verifying so it isn't clobbered.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 22:29:32 +00:00

510 lines
20 KiB
Python

"""Template CRUD for the exam-marker (/api/exam/templates...) — card S4-5.
All access is as-the-user (RLS-enforced; spec E1 fix) via ExamContext. Ownership is also
checked explicitly before mutating (E2: never trust a client-supplied id as authorization) —
defence in depth on top of RLS. A row the caller cannot see under RLS reads back as absent,
so cross-institute access surfaces as 404, never a data leak (IDOR-safe).
Hybrid persistence (R5.2): PUT /templates/{id} is a full-replace of the canvas children
(questions + response areas + boundaries); PATCH /questions/{qid} is the granular mark-scheme
edit path. Client-supplied UUIDs are preserved so Supabase ids stay aligned with the Neo4j
join keys (spec §2).
"""
from __future__ import annotations
import os
import uuid
from typing import Any, Dict, List, Optional, Tuple
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile
from fastapi.responses import Response
from modules.database.services.exam_projection import project_template, project_template_safe
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin
from modules.logger_tool import initialise_logger
from routers.exam.dependencies import ExamContext, get_exam_context, lookup_exam_code
from routers.exam.schemas import (
CreateTemplateRequest,
PatchQuestionRequest,
TemplateReplaceRequest,
)
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
router = APIRouter()
SOURCE_CABINET_NAME = "Exam Marker Template Sources"
SOURCE_BUCKET_FALLBACK = "cc.users"
# ─── helpers ─────────────────────────────────────────────────────────────────
def _rows(result: Any) -> List[Dict[str, Any]]:
data = getattr(result, "data", None)
if not data:
return []
return data if isinstance(data, list) else [data]
def _first(result: Any) -> Optional[Dict[str, Any]]:
rows = _rows(result)
return rows[0] if rows else None
def _fetch_template_or_404(ctx: ExamContext, template_id: str) -> Dict[str, Any]:
"""Load a template the caller can see (RLS-scoped). Missing/forbidden → 404."""
res = ctx.supabase.table("exam_templates").select("*").eq("id", template_id).limit(1).execute()
row = _first(res)
if not row:
raise HTTPException(status_code=404, detail="Template not found")
return row
def _require_owner(ctx: ExamContext, template: Dict[str, Any]) -> None:
"""Writes are limited to the owning teacher (R2.4)."""
if template.get("teacher_id") != ctx.user_id:
raise HTTPException(status_code=403, detail="Only the template owner can modify it")
def _require_source_visibility_or_404(ctx: ExamContext, template: Dict[str, Any]) -> None:
"""Template source reads must not leak existence across institutes or non-owners."""
if template.get("teacher_id") != ctx.user_id:
raise HTTPException(status_code=404, detail="Template not found")
if template.get("institute_id") not in ctx.institute_ids:
raise HTTPException(status_code=404, detail="Template not found")
def _template_has_recorded_marks(ctx: ExamContext, template_id: str) -> bool:
"""True if any mark_entry exists for a batch of this template (→ destructive PUT is unsafe)."""
batches = _rows(
ctx.supabase.table("marking_batches").select("id").eq("template_id", template_id).execute()
)
batch_ids = [b["id"] for b in batches]
if not batch_ids:
return False
marks = _rows(
ctx.supabase.table("mark_entries").select("id").in_("batch_id", batch_ids).limit(1).execute()
)
return bool(marks)
def _parse_storage_loc(storage_loc: str) -> Tuple[str, str]:
bucket, sep, path = (storage_loc or "").partition("/")
if not bucket or not sep or not path:
raise ValueError(f"Invalid storage_loc: {storage_loc!r}")
return bucket, path
def _lookup_exam_storage_loc(exam_id: str) -> Optional[str]:
try:
sb = SupabaseServiceRoleClient().supabase
res = sb.table("eb_exams").select("storage_loc").eq("id", exam_id).limit(1).execute()
row = _first(res)
return row.get("storage_loc") if row else None
except Exception as exc:
logger.warning(f"storage_loc lookup failed for exam_id={exam_id}: {exc}")
return None
async def _parse_create_template_request(request: Request) -> tuple[CreateTemplateRequest, Optional[UploadFile]]:
content_type = request.headers.get("content-type", "")
if "multipart/form-data" in content_type:
form = await request.form()
payload: Dict[str, Any] = {}
for key in ("title", "subject", "exam_id", "exam_code", "source_file_id", "page_count", "institute_id"):
value = form.get(key)
if value is not None and value != "":
payload[key] = value
upload = form.get("source_pdf")
if upload is not None and not hasattr(upload, "read"):
raise HTTPException(status_code=400, detail="source_pdf must be a file upload")
if upload is not None and payload.get("source_file_id"):
raise HTTPException(status_code=400, detail="Use either source_file_id or source_pdf, not both")
return CreateTemplateRequest(**payload), upload
try:
data = await request.json()
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Invalid request body: {exc}")
return CreateTemplateRequest(**data), None
async def _upload_template_source_file(
ctx: ExamContext,
institute_id: str,
upload: UploadFile,
) -> str:
file_bytes = await upload.read()
if not file_bytes:
raise HTTPException(status_code=400, detail="Uploaded PDF is empty")
if upload.content_type and upload.content_type != "application/pdf":
raise HTTPException(status_code=400, detail="Uploaded file must be a PDF")
service = SupabaseServiceRoleClient()
storage = StorageAdmin()
cabinet_name = SOURCE_CABINET_NAME
existing = _first(
service.supabase.table("file_cabinets")
.select("id")
.eq("user_id", ctx.user_id)
.eq("name", cabinet_name)
.limit(1)
.execute()
)
if existing:
cabinet_id = existing["id"]
else:
created_cabinet = _first(
service.supabase.table("file_cabinets")
.insert({"user_id": ctx.user_id, "name": cabinet_name})
.execute()
)
if not created_cabinet:
raise HTTPException(status_code=500, detail="Failed to create upload cabinet")
cabinet_id = created_cabinet["id"]
file_id = str(uuid.uuid4())
safe_name = os.path.basename(upload.filename or "template.pdf")
bucket = f"cc.institutes.{institute_id}.private" if institute_id else SOURCE_BUCKET_FALLBACK
storage_path = f"exam-marker/{cabinet_id}/{file_id}/{safe_name}"
try:
storage.upload_file(bucket, storage_path, file_bytes, "application/pdf", upsert=True)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Storage upload failed: {exc}")
inserted = _first(
service.supabase.table("files").insert(
{
"id": file_id,
"cabinet_id": cabinet_id,
"name": safe_name,
"path": storage_path,
"bucket": bucket,
"mime_type": "application/pdf",
"uploaded_by": ctx.user_id,
"size_bytes": len(file_bytes),
"source": "classroomcopilot-web",
"is_directory": False,
"relative_path": safe_name,
"processing_status": "uploaded",
}
).execute()
)
if not inserted:
raise HTTPException(status_code=500, detail="Failed to create file record")
return file_id
# ─── templates ───────────────────────────────────────────────────────────────
@router.post("/templates")
async def create_template(
request: Request,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
body, upload = await _parse_create_template_request(request)
institute_id = ctx.resolve_institute(body.institute_id)
if body.exam_id and body.source_file_id:
raise HTTPException(status_code=400, detail="Use either exam_id or source_file_id, not both")
exam_code = body.exam_code
if body.exam_id and not exam_code:
exam_code = lookup_exam_code(body.exam_id)
source_file_id = body.source_file_id
if upload is not None:
source_file_id = await _upload_template_source_file(ctx, institute_id, upload)
row = {
"title": body.title,
"subject": body.subject,
"exam_id": body.exam_id,
"exam_code": exam_code,
"source_file_id": source_file_id,
"page_count": body.page_count,
"institute_id": institute_id,
"teacher_id": ctx.user_id,
"status": "draft",
}
row = {k: v for k, v in row.items() if v is not None}
res = ctx.supabase.table("exam_templates").insert(row).execute()
created = _first(res)
if not created:
raise HTTPException(status_code=500, detail="Failed to create template")
logger.info(f"Exam template created: {created.get('id')} by {ctx.user_id}")
return created
@router.get("/catalogue")
async def list_catalogue_papers() -> Dict[str, Any]:
"""Lightweight exam-board paper catalogue for the create dialog."""
try:
sb = SupabaseServiceRoleClient().supabase
res = (
sb.table("eb_exams")
.select("id, exam_code, spec_code, paper_code, tier, session, type_code, storage_loc")
.eq("type_code", "QP")
.order("exam_code")
.execute()
)
return {"papers": _rows(res)}
except Exception as exc:
raise HTTPException(status_code=502, detail=f"Could not load catalogue papers: {exc}")
@router.get("/templates")
async def list_templates(
include_archived: bool = False,
institute_id: Optional[str] = None,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
# RLS already scopes to the caller's institutes; the optional filter narrows within that.
q = ctx.supabase.table("exam_templates").select("*")
if institute_id:
q = q.eq("institute_id", institute_id)
if not include_archived:
q = q.neq("status", "archived")
res = q.order("updated_at", desc=True).execute()
return {"templates": _rows(res)}
@router.get("/templates/{template_id}")
async def get_template(
template_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
template = _fetch_template_or_404(ctx, template_id)
questions = _rows(
ctx.supabase.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute()
)
response_areas = _rows(
ctx.supabase.table("exam_response_areas").select("*").eq("template_id", template_id).execute()
)
boundaries = _rows(
ctx.supabase.table("exam_boundaries").select("*").eq("template_id", template_id).execute()
)
return {
**template,
"questions": questions,
"response_areas": response_areas,
"boundaries": boundaries,
}
@router.get("/templates/{template_id}/source-pdf")
async def get_template_source_pdf(
template_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Response:
template = _fetch_template_or_404(ctx, template_id)
_require_source_visibility_or_404(ctx, template)
bucket: Optional[str] = None
path: Optional[str] = None
if template.get("exam_id"):
storage_loc = _lookup_exam_storage_loc(template["exam_id"])
if not storage_loc:
raise HTTPException(status_code=404, detail="Template source not found")
try:
bucket, path = _parse_storage_loc(storage_loc)
except ValueError:
raise HTTPException(status_code=404, detail="Template source not found")
elif template.get("source_file_id"):
file_row = _first(
ctx.supabase.table("files")
.select("bucket, path, mime_type, name")
.eq("id", template["source_file_id"])
.limit(1)
.execute()
)
if not file_row or not file_row.get("bucket") or not file_row.get("path"):
raise HTTPException(status_code=404, detail="Template source not found")
bucket = file_row["bucket"]
path = file_row["path"]
else:
raise HTTPException(status_code=404, detail="Template source not found")
if not bucket or not path:
raise HTTPException(status_code=404, detail="Template source not found")
try:
pdf_bytes = StorageAdmin().download_file(bucket, path)
except Exception as exc:
logger.warning(f"Template source download failed for template {template_id}: {exc}")
raise HTTPException(status_code=404, detail="Template source not found")
return Response(content=pdf_bytes, media_type="application/pdf")
@router.put("/templates/{template_id}")
async def replace_template(
template_id: str,
body: TemplateReplaceRequest,
background_tasks: BackgroundTasks,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
"""Full-replace canvas save (R5.2). Replaces questions/response_areas/boundaries wholesale.
Note: the delete-then-insert spans several PostgREST calls and is therefore not atomic;
acceptable for the small (~20-question) payloads this carries. A transactional RPC is a
later hardening step if concurrent canvas saves become a concern.
"""
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
# Data-loss guard: the wholesale question delete below cascades to mark_entries
# (mark_entries.question_id → exam_questions ON DELETE CASCADE). Refuse a structural
# full-replace once any marks have been recorded against this template's batches, so
# re-saving the setup canvas mid-marking can't silently wipe a teacher's marking work.
# (Mark-scheme tweaks use PATCH /questions/{id}, which is unaffected.)
if _template_has_recorded_marks(ctx, template_id):
raise HTTPException(
status_code=409,
detail="Template has recorded marks; structural full-replace is blocked. "
"Edit questions individually via PATCH /questions/{id}.",
)
# Optional template-level metadata update alongside the canvas.
if body.meta:
updates = {k: v for k, v in body.meta.dict().items() if v is not None}
if updates:
ctx.supabase.table("exam_templates").update(updates).eq("id", template_id).execute()
sb = ctx.supabase
# Clear existing children. Order matters: response_areas/boundaries reference questions, so
# remove them first (we delete by template_id rather than rely on cascade for predictability).
sb.table("exam_response_areas").delete().eq("template_id", template_id).execute()
sb.table("exam_boundaries").delete().eq("template_id", template_id).execute()
sb.table("exam_questions").delete().eq("template_id", template_id).execute()
# Re-insert, preserving client-supplied UUIDs (Neo4j join keys, spec §2).
if body.questions:
q_rows = []
for q in body.questions:
r = {
"template_id": template_id,
"parent_id": q.parent_id,
"label": q.label,
"order": q.order,
"max_marks": q.max_marks,
"answer_type": q.answer_type,
"mcq_options": q.mcq_options,
"mark_scheme": q.mark_scheme,
"is_container": q.is_container,
"spec_ref": q.spec_ref,
"bounds": q.bounds, # drawn Part box (73); null for derived main questions
"page": q.page,
}
if q.id:
r["id"] = q.id
q_rows.append({k: v for k, v in r.items() if v is not None})
sb.table("exam_questions").insert(q_rows).execute()
if body.response_areas:
ra_rows = []
for ra in body.response_areas:
r = {
"template_id": template_id,
"question_id": ra.question_id,
"page": ra.page,
"bounds": ra.bounds,
"kind": ra.kind,
"response_form": ra.response_form,
"context_type": ra.context_type, # 73: optional Context differentiation
"source": ra.source,
"confirmed": ra.confirmed,
"confidence": ra.confidence,
}
if ra.id:
r["id"] = ra.id
ra_rows.append({k: v for k, v in r.items() if v is not None})
sb.table("exam_response_areas").insert(ra_rows).execute()
if body.boundaries:
b_rows = []
for b in body.boundaries:
r = {
"template_id": template_id,
"question_id": b.question_id,
"label": b.label,
"page_index": b.page_index,
"y": b.y,
"bounds": b.bounds,
"source": b.source,
"confirmed": b.confirmed,
}
if b.id:
r["id"] = b.id
b_rows.append({k: v for k, v in r.items() if v is not None})
sb.table("exam_boundaries").insert(b_rows).execute()
logger.info(
f"Exam template {template_id} replaced: {len(body.questions)} questions, "
f"{len(body.response_areas)} regions, {len(body.boundaries)} boundaries"
)
# R3.5.4: a successful save enqueues a graph projection into cc.public.exams. BackgroundTasks
# is acceptable for Sprint 4 (durability via a real queue is a later step); failures are
# swallowed so the canvas save itself never fails on a graph hiccup.
background_tasks.add_task(project_template_safe, template_id)
return await get_template(template_id, ctx)
@router.delete("/templates/{template_id}")
async def archive_template(
template_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
"""Soft-delete: status='archived' (R5.2). Never hard-deletes a teacher's work."""
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
ctx.supabase.table("exam_templates").update({"status": "archived"}).eq("id", template_id).execute()
return {"status": "archived", "id": template_id}
@router.post("/templates/{template_id}/neo4j-sync")
async def neo4j_sync(
template_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
"""Manual graph-projection trigger (R5.3) for dev/backfill — runs synchronously and returns
counts. Auth/ownership is checked as-the-user; the projection itself uses service role
(R3.5.1, the documented graph-writer path)."""
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
try:
counts = project_template(template_id)
except Exception as exc:
logger.error(f"Manual neo4j-sync failed for template {template_id}: {exc}")
raise HTTPException(status_code=502, detail=f"Projection failed: {exc}")
return {"status": "ok", "projection": counts}
# ─── questions (granular edit path, R5.2) ────────────────────────────────────
@router.patch("/questions/{question_id}")
async def patch_question(
question_id: str,
body: PatchQuestionRequest,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
updates = {k: v for k, v in body.dict().items() if v is not None}
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
# RLS (exam_questions_all) enforces that the question belongs to a template owned by the
# caller; an out-of-scope id updates zero rows → 404, so no explicit pre-fetch is needed.
res = ctx.supabase.table("exam_questions").update(updates).eq("id", question_id).execute()
updated = _first(res)
if not updated:
raise HTTPException(status_code=404, detail="Question not found")
return updated