api/routers/exam/templates.py
kcar c69451fba2
Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled
[verified] add upload size and MIME guards
(cherry picked from commit f5e05376f637f55b73e474cac8199529682ca398)
2026-06-08 01:18:39 +00:00

959 lines
42 KiB
Python

"""Template CRUD for the exam-marker (/api/exam/templates...) — card S4-5.
All access is as-the-user (RLS-enforced; spec E1 fix) via ExamContext. Ownership is also
checked explicitly before mutating (E2: never trust a client-supplied id as authorization) —
defence in depth on top of RLS. A row the caller cannot see under RLS reads back as absent,
so cross-institute access surfaces as 404, never a data leak (IDOR-safe).
Hybrid persistence (R5.2): PUT /templates/{id} is a full-replace of the canvas children
(questions + response areas + boundaries); PATCH /questions/{qid} is the granular mark-scheme
edit path. Client-supplied UUIDs are preserved so Supabase ids stay aligned with the Neo4j
join keys (spec §2).
"""
from __future__ import annotations
import json
import os
import tempfile
import time
import uuid
from typing import Any, Dict, List, Optional, Tuple
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile
from fastapi.responses import JSONResponse, Response
from api.services.docling import AutoMapError, auto_map
from api.services.docling import extract as docling_extract
from api.services.docling.regions import detect_response_regions_from_pdf
from modules.database.services.exam_projection import project_template, project_template_safe
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin
from modules.upload_validation import read_pdf_upload_bytes
from modules.logger_tool import initialise_logger
from routers.exam.dependencies import ExamContext, get_exam_context, lookup_exam_code
from routers.exam.schemas import (
CreateTemplateRequest,
PatchQuestionRequest,
TemplateReplaceRequest,
UpdateTemplateMetaRequest,
)
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
router = APIRouter()
SOURCE_CABINET_NAME = "Exam Marker Template Sources"
SOURCE_BUCKET_FALLBACK = "cc.users"
AUTO_MAP_JOB_PREFIX = "exam:auto-map"
_AUTO_MAP_JOB_STATUS: Dict[str, Dict[str, Any]] = {}
# ─── helpers ─────────────────────────────────────────────────────────────────
def _rows(result: Any) -> List[Dict[str, Any]]:
data = getattr(result, "data", None)
if not data:
return []
return data if isinstance(data, list) else [data]
def _first(result: Any) -> Optional[Dict[str, Any]]:
rows = _rows(result)
return rows[0] if rows else None
def _fetch_template_or_404(ctx: ExamContext, template_id: str) -> Dict[str, Any]:
"""Load a template the caller can see (RLS-scoped). Missing/forbidden → 404."""
res = ctx.supabase.table("exam_templates").select("*").eq("id", template_id).limit(1).execute()
row = _first(res)
if not row:
raise HTTPException(status_code=404, detail="Template not found")
return row
def _require_owner(ctx: ExamContext, template: Dict[str, Any]) -> None:
"""Writes are limited to the owning teacher (R2.4)."""
if template.get("teacher_id") != ctx.user_id:
raise HTTPException(status_code=403, detail="Only the template owner can modify it")
def _require_source_visibility_or_404(ctx: ExamContext, template: Dict[str, Any]) -> None:
"""Institute boundary check — RLS already gates template visibility; this prevents cross-institute PDF leakage."""
if template.get("institute_id") not in ctx.institute_ids:
raise HTTPException(status_code=404, detail="Template not found")
def _template_has_recorded_marks(ctx: ExamContext, template_id: str) -> bool:
"""True if any mark_entry exists for a batch of this template (→ destructive PUT is unsafe)."""
batches = _rows(
ctx.supabase.table("marking_batches").select("id").eq("template_id", template_id).execute()
)
batch_ids = [b["id"] for b in batches]
if not batch_ids:
return False
marks = _rows(
ctx.supabase.table("mark_entries").select("id").in_("batch_id", batch_ids).limit(1).execute()
)
return bool(marks)
def _model_fields_set(model: Any) -> set[str]:
"""Return fields explicitly provided by the client (Pydantic v1/v2 compatible)."""
if hasattr(model, "model_fields_set"):
return set(model.model_fields_set)
return set(getattr(model, "__fields_set__", set()))
def _model_dump(model: Any) -> Dict[str, Any]:
"""Dump a Pydantic model without pinning this router to v1/v2 APIs."""
if hasattr(model, "model_dump"):
return model.model_dump()
return model.dict()
def _template_meta_updates(body: UpdateTemplateMetaRequest, *, include_explicit_nulls: bool = False) -> Dict[str, Any]:
data = _model_dump(body)
if include_explicit_nulls:
fields = _model_fields_set(body)
return {k: data[k] for k in fields if k in data}
return {k: v for k, v in data.items() if v is not None}
def _parse_storage_loc(storage_loc: str) -> Tuple[str, str]:
bucket, sep, path = (storage_loc or "").partition("/")
if not bucket or not sep or not path:
raise ValueError(f"Invalid storage_loc: {storage_loc!r}")
return bucket, path
def _lookup_exam_storage_loc(exam_id: str) -> Optional[str]:
try:
sb = SupabaseServiceRoleClient().supabase
res = sb.table("eb_exams").select("storage_loc").eq("id", exam_id).limit(1).execute()
row = _first(res)
return row.get("storage_loc") if row else None
except Exception as exc:
logger.warning(f"storage_loc lookup failed for exam_id={exam_id}: {exc}")
return None
async def _parse_create_template_request(request: Request) -> tuple[CreateTemplateRequest, Optional[UploadFile]]:
content_type = request.headers.get("content-type", "")
if "multipart/form-data" in content_type:
form = await request.form()
payload: Dict[str, Any] = {}
for key in ("title", "subject", "exam_id", "exam_code", "source_file_id", "page_count", "institute_id"):
value = form.get(key)
if value is not None and value != "":
payload[key] = value
upload = form.get("source_pdf")
if upload is not None and not hasattr(upload, "read"):
raise HTTPException(status_code=400, detail="source_pdf must be a file upload")
if upload is not None and payload.get("source_file_id"):
raise HTTPException(status_code=400, detail="Use either source_file_id or source_pdf, not both")
return CreateTemplateRequest(**payload), upload
try:
data = await request.json()
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Invalid request body: {exc}")
return CreateTemplateRequest(**data), None
async def _upload_template_source_file(
ctx: ExamContext,
institute_id: str,
upload: UploadFile,
) -> str:
file_bytes = await read_pdf_upload_bytes(upload)
service = SupabaseServiceRoleClient()
storage = StorageAdmin()
cabinet_name = SOURCE_CABINET_NAME
existing = _first(
service.supabase.table("file_cabinets")
.select("id")
.eq("user_id", ctx.user_id)
.eq("name", cabinet_name)
.limit(1)
.execute()
)
if existing:
cabinet_id = existing["id"]
else:
created_cabinet = _first(
service.supabase.table("file_cabinets")
.insert({"user_id": ctx.user_id, "name": cabinet_name})
.execute()
)
if not created_cabinet:
raise HTTPException(status_code=500, detail="Failed to create upload cabinet")
cabinet_id = created_cabinet["id"]
file_id = str(uuid.uuid4())
safe_name = os.path.basename(upload.filename or "template.pdf")
# Use the shared users bucket (exists on all envs). Per-institute private buckets
# (cc.institutes.<id>.private) are a future multi-tenant provisioning concern and are NOT
# created on dev .94 — using one here failed with "Bucket not found". The institute is already
# namespaced in the storage path + enforced by RLS on the files row.
bucket = SOURCE_BUCKET_FALLBACK
storage_path = f"exam-marker/{institute_id or 'noinst'}/{cabinet_id}/{file_id}/{safe_name}"
try:
storage.upload_file(bucket, storage_path, file_bytes, "application/pdf", upsert=True)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Storage upload failed: {exc}")
inserted = _first(
service.supabase.table("files").insert(
{
"id": file_id,
"cabinet_id": cabinet_id,
"name": safe_name,
"path": storage_path,
"bucket": bucket,
"mime_type": "application/pdf",
"uploaded_by": ctx.user_id,
"size_bytes": len(file_bytes),
"source": "classroomcopilot-web",
"is_directory": False,
"relative_path": safe_name,
"processing_status": "uploaded",
}
).execute()
)
if not inserted:
raise HTTPException(status_code=500, detail="Failed to create file record")
return file_id
def _job_key(job_id: str) -> str:
return f"{AUTO_MAP_JOB_PREFIX}:{job_id}"
def _redis_client() -> Any:
try:
import redis
except Exception:
return None
try:
url = os.getenv("LOCAL_REDIS_URL") or os.getenv("REDIS_URL")
if url:
client = redis.Redis.from_url(url, decode_responses=True, socket_timeout=2)
else:
client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
db=int(os.getenv("REDIS_DB_DEV", os.getenv("REDIS_DB", "0"))),
password=os.getenv("REDIS_PASSWORD") or None,
decode_responses=True,
socket_timeout=2,
)
client.ping()
return client
except Exception:
return None
def _set_auto_map_status(job_id: str, payload: Dict[str, Any]) -> None:
status = {"job_id": job_id, "updated_at": int(time.time()), **payload}
_AUTO_MAP_JOB_STATUS[job_id] = status
client = _redis_client()
if client is not None:
try:
client.setex(_job_key(job_id), int(os.getenv("EXAM_AUTO_MAP_JOB_TTL", "3600")), json.dumps(status))
except Exception as exc:
logger.warning(f"auto-map redis status write failed for {job_id}: {exc}")
def _get_auto_map_status(job_id: str) -> Optional[Dict[str, Any]]:
client = _redis_client()
if client is not None:
try:
raw = client.get(_job_key(job_id))
if raw:
return json.loads(raw)
except Exception as exc:
logger.warning(f"auto-map redis status read failed for {job_id}: {exc}")
return _AUTO_MAP_JOB_STATUS.get(job_id)
def _resolve_template_source(ctx: ExamContext, template: Dict[str, Any]) -> Tuple[str, str, bytes]:
bucket: Optional[str] = None
path: Optional[str] = None
if template.get("exam_id"):
storage_loc = _lookup_exam_storage_loc(template["exam_id"])
if not storage_loc:
raise HTTPException(status_code=404, detail="Template source not found")
try:
bucket, path = _parse_storage_loc(storage_loc)
except ValueError:
raise HTTPException(status_code=404, detail="Template source not found")
elif template.get("source_file_id"):
# Same scoped service-role exception as source-pdf: owner gate has already passed.
file_row = _first(
SupabaseServiceRoleClient().supabase.table("files")
.select("bucket, path, mime_type, name")
.eq("id", template["source_file_id"])
.limit(1)
.execute()
)
if not file_row or not file_row.get("bucket") or not file_row.get("path"):
raise HTTPException(status_code=404, detail="Template source not found")
bucket = file_row["bucket"]
path = file_row["path"]
else:
raise HTTPException(status_code=404, detail="Template source not found")
try:
return bucket, path, StorageAdmin().download_file(bucket, path)
except Exception as exc:
logger.warning(f"Template source download failed for template {template.get('id')}: {exc}")
raise HTTPException(status_code=404, detail="Template source not found")
def _pdf_has_text_layer(pdf_bytes: bytes) -> bool:
with tempfile.NamedTemporaryFile(prefix="cc-auto-map-detect-", suffix=".pdf", delete=False) as fh:
fh.write(pdf_bytes)
tmp = fh.name
try:
return bool(docling_extract.has_text_layer(tmp))
finally:
try:
os.unlink(tmp)
except OSError:
pass
def _pdf_page_geometry(pdf_bytes: bytes) -> List[Dict[str, float]]:
with tempfile.NamedTemporaryFile(prefix="cc-auto-map-geom-", suffix=".pdf", delete=False) as fh:
fh.write(pdf_bytes)
tmp = fh.name
try:
import fitz
doc = fitz.open(tmp)
pages: List[Dict[str, float]] = []
page_top = 0.0
try:
for page in doc:
media = page.mediabox
crop = page.cropbox
rendered_w = float(crop.width or page.rect.width or 595.0)
rendered_h = float(crop.height or page.rect.height or 842.0)
pages.append({
"media_x0": float(media.x0),
"crop_x0": float(crop.x0),
"crop_y0": float(crop.y0),
"page_pt_w": float(crop.width or page.rect.width or 1),
"page_pt_h": float(crop.height or page.rect.height or 1),
"rendered_w": rendered_w,
"rendered_h": rendered_h,
"page_top": page_top,
})
page_top += rendered_h
finally:
doc.close()
return pages
except Exception as exc:
logger.warning(f"PDF geometry read failed; falling back to A4 page geometry: {exc}")
return []
finally:
try:
os.unlink(tmp)
except OSError:
pass
def _page_geom(pages: List[Dict[str, float]], page_number: int) -> Dict[str, float]:
if 1 <= page_number <= len(pages):
return pages[page_number - 1]
return {
"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0,
"page_pt_w": 595.0, "page_pt_h": 842.0,
"rendered_w": 595.0, "rendered_h": 842.0,
"page_top": (page_number - 1) * 842.0,
}
def _box_to_canvas(box: Optional[Dict[str, Any]], page_number: int, pages: List[Dict[str, float]]) -> Optional[Dict[str, float]]:
if not box:
return None
g = _page_geom(pages, page_number)
if box.get("coord_origin") == "TOPLEFT" and {"x", "y", "w", "h"}.issubset(box):
scale = 0.5 if box.get("unit") == "px" else 1.0
return {
"x": round(float(box["x"]) * scale, 2),
"y": round(g["page_top"] + float(box["y"]) * scale, 2),
"w": round(float(box["w"]) * scale, 2),
"h": round(float(box["h"]) * scale, 2),
}
if not {"l", "t", "r", "b"}.issubset(box):
return None
l, t, r, b = (float(box[k]) for k in ("l", "t", "r", "b"))
# Canvas pages are rendered from the PDF CropBox with page_left fixed at 0.
# Docling boxes are in PDF user-space coordinates, so subtract the CropBox
# origin instead of adding it; otherwise cropped PDFs shift right/down.
x = (l - g["crop_x0"]) / g["page_pt_w"] * g["rendered_w"]
y = g["page_top"] + (g["page_pt_h"] - (t - g["crop_y0"])) / g["page_pt_h"] * g["rendered_h"]
w = (r - l) / g["page_pt_w"] * g["rendered_w"]
h = (t - b) / g["page_pt_h"] * g["rendered_h"]
return {"x": round(x, 2), "y": round(y, 2), "w": round(w, 2), "h": round(h, 2)}
def _response_form_from_region_type(region_type: Any) -> Optional[str]:
return {
"answer_lines": "lines",
"answer_box": "answer-box",
"working_space": "working",
"lines": "lines",
"answer-box": "answer-box",
"working": "working",
}.get(str(region_type or ""))
def _y_to_canvas(y_value: float, page_number: int, pages: List[Dict[str, float]]) -> float:
g = _page_geom(pages, page_number)
return round(g["page_top"] + (g["page_pt_h"] - (float(y_value) - g["crop_y0"])) / g["page_pt_h"] * g["rendered_h"], 2)
def _ai_id(template_id: str, *parts: Any) -> str:
return str(uuid.uuid5(uuid.NAMESPACE_URL, "/".join(["cc-auto-map", template_id, *[str(p) for p in parts]])))
def _safe_confidence(value: Any = None) -> float:
if isinstance(value, (int, float)):
return max(0.0, min(1.0, float(value)))
return 0.75
def _margin_values(first_pass: Dict[str, Any], page_number: int) -> Dict[str, Optional[float]]:
vals: Dict[str, Optional[float]] = {"left": None, "right": None, "top": None, "bottom": None}
for m in first_pass.get("margins") or []:
edge = m.get("edge")
if edge not in vals:
continue
if m.get("scope") == "document" and edge in {"left", "right"}:
vals[edge] = m.get("value")
elif m.get("scope") == "page" and int(m.get("page") or -1) == page_number:
vals[edge] = m.get("value")
return vals
def _map_first_pass_to_rows(template_id: str, first_pass: Dict[str, Any], pdf_bytes: bytes, extra_regions: Optional[List[Dict[str, Any]]] = None) -> Dict[str, List[Dict[str, Any]]]:
pages_geom = _pdf_page_geometry(pdf_bytes)
questions: List[Dict[str, Any]] = []
response_areas: List[Dict[str, Any]] = []
boundaries: List[Dict[str, Any]] = []
layout: List[Dict[str, Any]] = []
q_ids: Dict[str, str] = {}
first_part_by_page: Dict[int, str] = {}
pages_obj = first_pass.get("pages") or {}
for page_key in sorted(pages_obj, key=lambda k: int(k)):
page_number = int(page_key)
page_index = page_number - 1
page = pages_obj[page_key]
margins = _margin_values(first_pass, page_number)
layout.append({
"id": _ai_id(template_id, "layout", page_number),
"template_id": template_id,
"page_index": page_index,
"role": page.get("role"),
"margin_left": margins["left"],
"margin_right": margins["right"],
"margin_top": margins["top"],
"margin_bottom": margins["bottom"],
"margins_enabled": bool(page.get("margins_enabled", True)),
"source": "ai",
"confirmed": False,
"confidence": 0.8,
"derivation": "docling-page-layout",
"meta": {"role_source": page.get("role_source"), "schema": first_pass.get("meta", {}).get("schema")},
})
for band in page.get("main_bands") or []:
label = str(band.get("question") or "").strip()
if not label:
continue
qid = q_ids.setdefault(label, _ai_id(template_id, "question", label))
if not any(q["id"] == qid for q in questions):
questions.append({"id": qid, "template_id": template_id, "label": label, "order": len(q_ids) - 1, "max_marks": 0, "is_container": True, "source": "ai", "confirmed": False, "confidence": _safe_confidence(band.get("confidence")), "derivation": "docling-main-band"})
for edge, yv in (("start", band.get("y_start")), ("end", band.get("y_end"))):
if yv is not None:
boundaries.append({"id": _ai_id(template_id, "boundary", label, edge, page_number), "template_id": template_id, "question_id": qid, "label": f"{label}:{edge}", "page_index": page_index, "y": _y_to_canvas(float(yv), page_number, pages_geom), "bounds": None, "source": "ai", "confirmed": False, "confidence": _safe_confidence(band.get("confidence")), "derivation": "docling-main-band"})
for band in page.get("part_bands") or []:
label = str(band.get("label") or "").strip()
parent_label = str(band.get("question") or "").strip()
if not label:
continue
parent_id = q_ids.setdefault(parent_label, _ai_id(template_id, "question", parent_label or label.split(".")[0]))
if parent_label and not any(q["id"] == parent_id for q in questions):
questions.append({"id": parent_id, "template_id": template_id, "label": parent_label, "order": len(q_ids) - 1, "max_marks": 0, "is_container": True, "source": "ai", "confirmed": False, "confidence": 0.7, "derivation": "docling-inferred-main-question"})
pid = _ai_id(template_id, "part", label)
first_part_by_page.setdefault(page_index, pid)
bounds = None
y1, y2 = band.get("y_start"), band.get("y_end")
if margins["left"] is not None and margins["right"] is not None and y1 is not None and y2 is not None:
top = max(float(y1), float(y2)); bottom = min(float(y1), float(y2))
bounds = _box_to_canvas({"l": margins["left"], "r": margins["right"], "t": top, "b": bottom, "coord_origin": "BOTTOMLEFT"}, page_number, pages_geom)
bounds = bounds or _box_to_canvas(band.get("label_box"), page_number, pages_geom)
questions.append({"id": pid, "template_id": template_id, "parent_id": parent_id, "label": label, "order": len(questions), "max_marks": 0, "is_container": False, "bounds": bounds, "page": page_number, "source": "ai", "confirmed": False, "confidence": _safe_confidence(band.get("confidence")), "derivation": "docling-part-band-x-margins"})
default_qid = questions[0]["id"] if questions else _ai_id(template_id, "question", "auto")
for page_key in sorted(pages_obj, key=lambda k: int(k)):
page_number = int(page_key); page_index = page_number - 1; page = pages_obj[page_key]
owner_qid = first_part_by_page.get(page_index, default_qid)
for collection, kind, context_type, derivation in (("furniture", "furniture", None, "docling-furniture"), ("figures", "context", "figure", "docling-context-figure"), ("tables", "context", "data_table", "docling-table")):
for idx, item in enumerate(page.get(collection) or []):
bounds = _box_to_canvas(item.get("box"), page_number, pages_geom)
if bounds:
row = {"id": _ai_id(template_id, collection, page_number, idx), "template_id": template_id, "question_id": owner_qid, "page": page_number, "bounds": bounds, "kind": kind, "source": "ai", "confirmed": False, "confidence": 0.65, "derivation": derivation}
if context_type:
row["context_type"] = context_type
response_areas.append(row)
for idx, region in enumerate(extra_regions or []):
page_index = int(region.get("page_index", 0))
bounds = _box_to_canvas(region.get("bbox") or {}, page_index + 1, pages_geom)
if bounds:
response_form = _response_form_from_region_type(region.get("region_type"))
if response_form:
response_areas.append({"id": _ai_id(template_id, "region", page_index, idx), "template_id": template_id, "question_id": first_part_by_page.get(page_index, default_qid), "page": page_index + 1, "bounds": bounds, "kind": "response", "response_form": response_form, "source": "ai", "confirmed": False, "confidence": _safe_confidence(region.get("confidence")), "derivation": region.get("detection_method") or "opencv-response-region"})
return {"questions": questions, "response_areas": response_areas, "boundaries": boundaries, "layout": layout}
def _refresh_ai_rows(ctx: ExamContext, template_id: str, rows: Dict[str, List[Dict[str, Any]]]) -> None:
sb = ctx.supabase
for table in ("exam_response_areas", "exam_boundaries", "exam_template_layout", "exam_questions"):
sb.table(table).delete().eq("template_id", template_id).eq("source", "ai").eq("confirmed", False).execute()
for table, key in (("exam_questions", "questions"), ("exam_response_areas", "response_areas"), ("exam_boundaries", "boundaries"), ("exam_template_layout", "layout")):
payload = rows.get(key) or []
if payload:
sb.table(table).insert(payload).execute()
def _run_auto_map_merge(ctx: ExamContext, template_id: str, pdf_bytes: bytes, source_label: str) -> Dict[str, List[Dict[str, Any]]]:
first_pass = auto_map(pdf_bytes, source_pdf=source_label)
extra_regions: List[Dict[str, Any]] = []
try:
with tempfile.NamedTemporaryFile(prefix="cc-auto-map-regions-", suffix=".pdf", delete=False) as fh:
fh.write(pdf_bytes)
tmp = fh.name
try:
extra_regions = detect_response_regions_from_pdf(tmp)
finally:
try:
os.unlink(tmp)
except OSError:
pass
except Exception as exc:
logger.info(f"auto-map response-region detection skipped for template {template_id}: {exc}")
rows = _map_first_pass_to_rows(template_id, first_pass, pdf_bytes, extra_regions)
_refresh_ai_rows(ctx, template_id, rows)
updates = {"exam_code": first_pass.get("meta", {}).get("paper_code"), "page_count": first_pass.get("meta", {}).get("n_pages")}
ctx.supabase.table("exam_templates").update({k: v for k, v in updates.items() if v is not None}).eq("id", template_id).execute()
return rows
def _run_auto_map_job(job_id: str, ctx: ExamContext, template_id: str, pdf_bytes: bytes, source_label: str) -> None:
_set_auto_map_status(job_id, {"status": "running", "template_id": template_id})
try:
rows = _run_auto_map_merge(ctx, template_id, pdf_bytes, source_label)
_set_auto_map_status(job_id, {"status": "completed", "template_id": template_id, "counts": {k: len(v) for k, v in rows.items()}})
except Exception as exc:
logger.exception(f"auto-map job failed for template {template_id}: {exc}")
_set_auto_map_status(job_id, {"status": "failed", "template_id": template_id, "error": str(exc)})
# ─── templates ───────────────────────────────────────────────────────────────
@router.post("/templates")
async def create_template(
request: Request,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
body, upload = await _parse_create_template_request(request)
institute_id = ctx.resolve_institute(body.institute_id)
if body.exam_id and body.source_file_id:
raise HTTPException(status_code=400, detail="Use either exam_id or source_file_id, not both")
exam_code = body.exam_code
if body.exam_id and not exam_code:
exam_code = lookup_exam_code(body.exam_id)
source_file_id = body.source_file_id
if upload is not None:
source_file_id = await _upload_template_source_file(ctx, institute_id, upload)
row = {
"title": body.title,
"subject": body.subject,
"exam_id": body.exam_id,
"exam_code": exam_code,
"source_file_id": source_file_id,
"page_count": body.page_count,
"institute_id": institute_id,
"teacher_id": ctx.user_id,
"status": "draft",
}
row = {k: v for k, v in row.items() if v is not None}
res = ctx.supabase.table("exam_templates").insert(row).execute()
created = _first(res)
if not created:
raise HTTPException(status_code=500, detail="Failed to create template")
logger.info(f"Exam template created: {created.get('id')} by {ctx.user_id}")
return created
@router.get("/catalogue")
async def list_catalogue_papers() -> Dict[str, Any]:
"""Lightweight exam-board paper catalogue for the create dialog."""
try:
sb = SupabaseServiceRoleClient().supabase
res = (
sb.table("eb_exams")
.select("id, exam_code, spec_code, paper_code, tier, session, type_code, storage_loc")
.eq("type_code", "QP")
.order("exam_code")
.execute()
)
return {"papers": _rows(res)}
except Exception as exc:
raise HTTPException(status_code=502, detail=f"Could not load catalogue papers: {exc}")
@router.get("/templates")
async def list_templates(
include_archived: bool = False,
institute_id: Optional[str] = None,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
# RLS already scopes to the caller's institutes; the optional filter narrows within that.
q = ctx.supabase.table("exam_templates").select("*")
if institute_id:
q = q.eq("institute_id", institute_id)
if not include_archived:
q = q.neq("status", "archived")
res = q.order("updated_at", desc=True).execute()
return {"templates": _rows(res)}
@router.get("/templates/{template_id}")
async def get_template(
template_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
template = _fetch_template_or_404(ctx, template_id)
questions = _rows(
ctx.supabase.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute()
)
response_areas = _rows(
ctx.supabase.table("exam_response_areas").select("*").eq("template_id", template_id).execute()
)
boundaries = _rows(
ctx.supabase.table("exam_boundaries").select("*").eq("template_id", template_id).execute()
)
layout = _rows(
ctx.supabase.table("exam_template_layout")
.select("*")
.eq("template_id", template_id)
.order("page_index")
.execute()
)
return {
**template,
"questions": questions,
"response_areas": response_areas,
"boundaries": boundaries,
"layout": layout,
}
@router.get("/templates/{template_id}/source-pdf")
async def get_template_source_pdf(
template_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Response:
template = _fetch_template_or_404(ctx, template_id)
_require_source_visibility_or_404(ctx, template)
_, _, pdf_bytes = _resolve_template_source(ctx, template)
return Response(content=pdf_bytes, media_type="application/pdf")
@router.post("/templates/{template_id}/auto-map", response_model=None)
async def auto_map_template(
template_id: str,
background_tasks: BackgroundTasks,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any] | JSONResponse:
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
_require_source_visibility_or_404(ctx, template)
if _template_has_recorded_marks(ctx, template_id):
raise HTTPException(status_code=409, detail="Template has recorded marks; auto-map structural refresh is blocked.")
bucket, path, pdf_bytes = _resolve_template_source(ctx, template)
source_label = f"{bucket}/{path}"
try:
fast_path = _pdf_has_text_layer(pdf_bytes)
except Exception as exc:
logger.warning(f"auto-map text-layer detection failed for template {template_id}; falling back to OCR queue: {exc}")
fast_path = False
if not fast_path:
job_id = str(uuid.uuid4())
_set_auto_map_status(job_id, {"status": "queued", "template_id": template_id})
background_tasks.add_task(_run_auto_map_job, job_id, ctx, template_id, pdf_bytes, source_label)
return JSONResponse(status_code=202, content={"status": "accepted", "job_id": job_id})
try:
_run_auto_map_merge(ctx, template_id, pdf_bytes, source_label)
except (AutoMapError, ValueError) as exc:
raise HTTPException(status_code=422, detail=f"Auto-map failed: {exc}")
except Exception as exc:
logger.exception(f"auto-map failed for template {template_id}: {exc}")
raise HTTPException(status_code=502, detail=f"Auto-map failed: {exc}")
background_tasks.add_task(project_template_safe, template_id)
return await get_template(template_id, ctx)
@router.get("/templates/{template_id}/auto-map/{job_id}/status")
async def auto_map_status(
template_id: str,
job_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
status = _get_auto_map_status(job_id)
if not status or status.get("template_id") != template_id:
raise HTTPException(status_code=404, detail="Auto-map job not found")
body = dict(status)
if body.get("status") == "completed":
body["template"] = await get_template(template_id, ctx)
return body
@router.put("/templates/{template_id}")
async def replace_template(
template_id: str,
body: TemplateReplaceRequest,
background_tasks: BackgroundTasks,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
"""Full-replace canvas save (R5.2). Replaces questions/response_areas/boundaries wholesale.
Note: the delete-then-insert spans several PostgREST calls and is therefore not atomic;
acceptable for the small (~20-question) payloads this carries. A transactional RPC is a
later hardening step if concurrent canvas saves become a concern.
"""
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
# Data-loss guard: the wholesale question delete below cascades to mark_entries
# (mark_entries.question_id → exam_questions ON DELETE CASCADE). Refuse a structural
# full-replace once any marks have been recorded against this template's batches, so
# re-saving the setup canvas mid-marking can't silently wipe a teacher's marking work.
# (Mark-scheme tweaks use PATCH /questions/{id}, which is unaffected.)
if _template_has_recorded_marks(ctx, template_id):
raise HTTPException(
status_code=409,
detail="Template has recorded marks; structural full-replace is blocked. "
"Edit questions individually via PATCH /questions/{id}.",
)
# Optional template-level metadata update alongside the canvas.
if body.meta:
updates = _template_meta_updates(body.meta)
if updates:
ctx.supabase.table("exam_templates").update(updates).eq("id", template_id).execute()
sb = ctx.supabase
# Clear existing children. Order matters: response_areas/boundaries reference questions, so
# remove them first (we delete by template_id rather than rely on cascade for predictability).
sb.table("exam_response_areas").delete().eq("template_id", template_id).execute()
sb.table("exam_boundaries").delete().eq("template_id", template_id).execute()
sb.table("exam_template_layout").delete().eq("template_id", template_id).execute()
sb.table("exam_questions").delete().eq("template_id", template_id).execute()
# Re-insert, preserving client-supplied UUIDs (Neo4j join keys, spec §2).
if body.questions:
q_rows = []
for q in body.questions:
r = {
"template_id": template_id,
"parent_id": q.parent_id,
"label": q.label,
"order": q.order,
"max_marks": q.max_marks,
"answer_type": q.answer_type,
"mcq_options": q.mcq_options,
"mark_scheme": q.mark_scheme,
"is_container": q.is_container,
"spec_ref": q.spec_ref,
"bounds": q.bounds, # drawn Part box (73); null for derived main questions
"page": q.page,
"source": q.source,
"confirmed": q.confirmed,
"confidence": q.confidence,
"derivation": q.derivation,
}
if q.id:
r["id"] = q.id
q_rows.append({k: v for k, v in r.items() if v is not None})
sb.table("exam_questions").insert(q_rows).execute()
if body.response_areas:
ra_rows = []
for ra in body.response_areas:
r = {
"template_id": template_id,
"question_id": ra.question_id,
"page": ra.page,
"bounds": ra.bounds,
"kind": ra.kind,
"response_form": ra.response_form,
"context_type": ra.context_type, # 73: optional Context differentiation
"source": ra.source,
"confirmed": ra.confirmed,
"confidence": ra.confidence,
"mark_subtype": ra.mark_subtype,
"derivation": ra.derivation,
}
if ra.id:
r["id"] = ra.id
ra_rows.append({k: v for k, v in r.items() if v is not None})
sb.table("exam_response_areas").insert(ra_rows).execute()
if body.boundaries:
b_rows = []
for b in body.boundaries:
r = {
"template_id": template_id,
"question_id": b.question_id,
"label": b.label,
"page_index": b.page_index,
"y": b.y,
"bounds": b.bounds,
"source": b.source,
"confirmed": b.confirmed,
"confidence": b.confidence,
"derivation": b.derivation,
}
if b.id:
r["id"] = b.id
b_rows.append({k: v for k, v in r.items() if v is not None})
sb.table("exam_boundaries").insert(b_rows).execute()
if body.layout:
layout_rows = []
for item in body.layout:
r = {
"template_id": template_id,
"page_index": item.page_index,
"role": item.role,
"margin_left": item.margin_left,
"margin_right": item.margin_right,
"margin_top": item.margin_top,
"margin_bottom": item.margin_bottom,
"margins_enabled": item.margins_enabled,
"source": item.source,
"confirmed": item.confirmed,
"confidence": item.confidence,
"derivation": item.derivation,
"meta": item.meta,
}
if item.id:
r["id"] = item.id
layout_rows.append({k: v for k, v in r.items() if v is not None})
sb.table("exam_template_layout").insert(layout_rows).execute()
logger.info(
f"Exam template {template_id} replaced: {len(body.questions)} questions, "
f"{len(body.response_areas)} regions, {len(body.boundaries)} boundaries, "
f"{len(body.layout)} layout rows"
)
# R3.5.4: a successful save enqueues a graph projection into cc.public.exams. BackgroundTasks
# is acceptable for Sprint 4 (durability via a real queue is a later step); failures are
# swallowed so the canvas save itself never fails on a graph hiccup.
background_tasks.add_task(project_template_safe, template_id)
return await get_template(template_id, ctx)
@router.patch("/templates/{template_id}")
async def patch_template_meta(
template_id: str,
body: UpdateTemplateMetaRequest,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
"""Metadata-only template update.
Unlike PUT /templates/{id}, this never deletes/re-inserts questions, response areas or
boundaries, so it is safe after marking has started. RLS scopes the initial read and
owner-only writes are enforced explicitly like the structural save path.
"""
updates = _template_meta_updates(body, include_explicit_nulls=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
res = ctx.supabase.table("exam_templates").update(updates).eq("id", template_id).execute()
updated = _first(res)
if not updated:
raise HTTPException(status_code=404, detail="Template not found")
return updated
@router.delete("/templates/{template_id}")
async def archive_template(
template_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
"""Soft-delete: status='archived' (R5.2). Never hard-deletes a teacher's work."""
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
ctx.supabase.table("exam_templates").update({"status": "archived"}).eq("id", template_id).execute()
return {"status": "archived", "id": template_id}
@router.post("/templates/{template_id}/neo4j-sync")
async def neo4j_sync(
template_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
"""Manual graph-projection trigger (R5.3) for dev/backfill — runs synchronously and returns
counts. Auth/ownership is checked as-the-user; the projection itself uses service role
(R3.5.1, the documented graph-writer path)."""
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
try:
counts = project_template(template_id)
except Exception as exc:
logger.error(f"Manual neo4j-sync failed for template {template_id}: {exc}")
raise HTTPException(status_code=502, detail=f"Projection failed: {exc}")
return {"status": "ok", "projection": counts}
# ─── questions (granular edit path, R5.2) ────────────────────────────────────
@router.patch("/questions/{question_id}")
async def patch_question(
question_id: str,
body: PatchQuestionRequest,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
updates = {k: v for k, v in _model_dump(body).items() if v is not None}
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
# RLS (exam_questions_all) enforces that the question belongs to a template owned by the
# caller; an out-of-scope id updates zero rows → 404, so no explicit pre-fetch is needed.
res = ctx.supabase.table("exam_questions").update(updates).eq("id", question_id).execute()
updated = _first(res)
if not updated:
raise HTTPException(status_code=404, detail="Question not found")
return updated