"""Template CRUD for the exam-marker (/api/exam/templates...) — card S4-5. All access is as-the-user (RLS-enforced; spec E1 fix) via ExamContext. Ownership is also checked explicitly before mutating (E2: never trust a client-supplied id as authorization) — defence in depth on top of RLS. A row the caller cannot see under RLS reads back as absent, so cross-institute access surfaces as 404, never a data leak (IDOR-safe). Hybrid persistence (R5.2): PUT /templates/{id} is a full-replace of the canvas children (questions + response areas + boundaries); PATCH /questions/{qid} is the granular mark-scheme edit path. Client-supplied UUIDs are preserved so Supabase ids stay aligned with the Neo4j join keys (spec §2). """ from __future__ import annotations import os import uuid from typing import Any, Dict, List, Optional, Tuple from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile from fastapi.responses import Response from modules.database.services.exam_projection import project_template, project_template_safe from modules.database.supabase.utils.client import SupabaseServiceRoleClient from modules.database.supabase.utils.storage import StorageAdmin from modules.logger_tool import initialise_logger from routers.exam.dependencies import ExamContext, get_exam_context, lookup_exam_code from routers.exam.schemas import ( CreateTemplateRequest, PatchQuestionRequest, TemplateReplaceRequest, UpdateTemplateMetaRequest, ) logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) router = APIRouter() SOURCE_CABINET_NAME = "Exam Marker Template Sources" SOURCE_BUCKET_FALLBACK = "cc.users" # ─── helpers ───────────────────────────────────────────────────────────────── def _rows(result: Any) -> List[Dict[str, Any]]: data = getattr(result, "data", None) if not data: return [] return data if isinstance(data, list) else [data] def _first(result: Any) -> Optional[Dict[str, Any]]: rows = _rows(result) return rows[0] if rows else None def _fetch_template_or_404(ctx: ExamContext, template_id: str) -> Dict[str, Any]: """Load a template the caller can see (RLS-scoped). Missing/forbidden → 404.""" res = ctx.supabase.table("exam_templates").select("*").eq("id", template_id).limit(1).execute() row = _first(res) if not row: raise HTTPException(status_code=404, detail="Template not found") return row def _require_owner(ctx: ExamContext, template: Dict[str, Any]) -> None: """Writes are limited to the owning teacher (R2.4).""" if template.get("teacher_id") != ctx.user_id: raise HTTPException(status_code=403, detail="Only the template owner can modify it") def _require_source_visibility_or_404(ctx: ExamContext, template: Dict[str, Any]) -> None: """Institute boundary check — RLS already gates template visibility; this prevents cross-institute PDF leakage.""" if template.get("institute_id") not in ctx.institute_ids: raise HTTPException(status_code=404, detail="Template not found") def _template_has_recorded_marks(ctx: ExamContext, template_id: str) -> bool: """True if any mark_entry exists for a batch of this template (→ destructive PUT is unsafe).""" batches = _rows( ctx.supabase.table("marking_batches").select("id").eq("template_id", template_id).execute() ) batch_ids = [b["id"] for b in batches] if not batch_ids: return False marks = _rows( ctx.supabase.table("mark_entries").select("id").in_("batch_id", batch_ids).limit(1).execute() ) return bool(marks) def _model_fields_set(model: Any) -> set[str]: """Return fields explicitly provided by the client (Pydantic v1/v2 compatible).""" if hasattr(model, "model_fields_set"): return set(model.model_fields_set) return set(getattr(model, "__fields_set__", set())) def _model_dump(model: Any) -> Dict[str, Any]: """Dump a Pydantic model without pinning this router to v1/v2 APIs.""" if hasattr(model, "model_dump"): return model.model_dump() return model.dict() def _template_meta_updates(body: UpdateTemplateMetaRequest, *, include_explicit_nulls: bool = False) -> Dict[str, Any]: data = _model_dump(body) if include_explicit_nulls: fields = _model_fields_set(body) return {k: data[k] for k in fields if k in data} return {k: v for k, v in data.items() if v is not None} def _parse_storage_loc(storage_loc: str) -> Tuple[str, str]: bucket, sep, path = (storage_loc or "").partition("/") if not bucket or not sep or not path: raise ValueError(f"Invalid storage_loc: {storage_loc!r}") return bucket, path def _lookup_exam_storage_loc(exam_id: str) -> Optional[str]: try: sb = SupabaseServiceRoleClient().supabase res = sb.table("eb_exams").select("storage_loc").eq("id", exam_id).limit(1).execute() row = _first(res) return row.get("storage_loc") if row else None except Exception as exc: logger.warning(f"storage_loc lookup failed for exam_id={exam_id}: {exc}") return None async def _parse_create_template_request(request: Request) -> tuple[CreateTemplateRequest, Optional[UploadFile]]: content_type = request.headers.get("content-type", "") if "multipart/form-data" in content_type: form = await request.form() payload: Dict[str, Any] = {} for key in ("title", "subject", "exam_id", "exam_code", "source_file_id", "page_count", "institute_id"): value = form.get(key) if value is not None and value != "": payload[key] = value upload = form.get("source_pdf") if upload is not None and not hasattr(upload, "read"): raise HTTPException(status_code=400, detail="source_pdf must be a file upload") if upload is not None and payload.get("source_file_id"): raise HTTPException(status_code=400, detail="Use either source_file_id or source_pdf, not both") return CreateTemplateRequest(**payload), upload try: data = await request.json() except Exception as exc: raise HTTPException(status_code=400, detail=f"Invalid request body: {exc}") return CreateTemplateRequest(**data), None async def _upload_template_source_file( ctx: ExamContext, institute_id: str, upload: UploadFile, ) -> str: file_bytes = await upload.read() if not file_bytes: raise HTTPException(status_code=400, detail="Uploaded PDF is empty") if upload.content_type and upload.content_type != "application/pdf": raise HTTPException(status_code=400, detail="Uploaded file must be a PDF") service = SupabaseServiceRoleClient() storage = StorageAdmin() cabinet_name = SOURCE_CABINET_NAME existing = _first( service.supabase.table("file_cabinets") .select("id") .eq("user_id", ctx.user_id) .eq("name", cabinet_name) .limit(1) .execute() ) if existing: cabinet_id = existing["id"] else: created_cabinet = _first( service.supabase.table("file_cabinets") .insert({"user_id": ctx.user_id, "name": cabinet_name}) .execute() ) if not created_cabinet: raise HTTPException(status_code=500, detail="Failed to create upload cabinet") cabinet_id = created_cabinet["id"] file_id = str(uuid.uuid4()) safe_name = os.path.basename(upload.filename or "template.pdf") # Use the shared users bucket (exists on all envs). Per-institute private buckets # (cc.institutes..private) are a future multi-tenant provisioning concern and are NOT # created on dev .94 — using one here failed with "Bucket not found". The institute is already # namespaced in the storage path + enforced by RLS on the files row. bucket = SOURCE_BUCKET_FALLBACK storage_path = f"exam-marker/{institute_id or 'noinst'}/{cabinet_id}/{file_id}/{safe_name}" try: storage.upload_file(bucket, storage_path, file_bytes, "application/pdf", upsert=True) except Exception as exc: raise HTTPException(status_code=500, detail=f"Storage upload failed: {exc}") inserted = _first( service.supabase.table("files").insert( { "id": file_id, "cabinet_id": cabinet_id, "name": safe_name, "path": storage_path, "bucket": bucket, "mime_type": "application/pdf", "uploaded_by": ctx.user_id, "size_bytes": len(file_bytes), "source": "classroomcopilot-web", "is_directory": False, "relative_path": safe_name, "processing_status": "uploaded", } ).execute() ) if not inserted: raise HTTPException(status_code=500, detail="Failed to create file record") return file_id # ─── templates ─────────────────────────────────────────────────────────────── @router.post("/templates") async def create_template( request: Request, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: body, upload = await _parse_create_template_request(request) institute_id = ctx.resolve_institute(body.institute_id) if body.exam_id and body.source_file_id: raise HTTPException(status_code=400, detail="Use either exam_id or source_file_id, not both") exam_code = body.exam_code if body.exam_id and not exam_code: exam_code = lookup_exam_code(body.exam_id) source_file_id = body.source_file_id if upload is not None: source_file_id = await _upload_template_source_file(ctx, institute_id, upload) row = { "title": body.title, "subject": body.subject, "exam_id": body.exam_id, "exam_code": exam_code, "source_file_id": source_file_id, "page_count": body.page_count, "institute_id": institute_id, "teacher_id": ctx.user_id, "status": "draft", } row = {k: v for k, v in row.items() if v is not None} res = ctx.supabase.table("exam_templates").insert(row).execute() created = _first(res) if not created: raise HTTPException(status_code=500, detail="Failed to create template") logger.info(f"Exam template created: {created.get('id')} by {ctx.user_id}") return created @router.get("/catalogue") async def list_catalogue_papers() -> Dict[str, Any]: """Lightweight exam-board paper catalogue for the create dialog.""" try: sb = SupabaseServiceRoleClient().supabase res = ( sb.table("eb_exams") .select("id, exam_code, spec_code, paper_code, tier, session, type_code, storage_loc") .eq("type_code", "QP") .order("exam_code") .execute() ) return {"papers": _rows(res)} except Exception as exc: raise HTTPException(status_code=502, detail=f"Could not load catalogue papers: {exc}") @router.get("/templates") async def list_templates( include_archived: bool = False, institute_id: Optional[str] = None, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: # RLS already scopes to the caller's institutes; the optional filter narrows within that. q = ctx.supabase.table("exam_templates").select("*") if institute_id: q = q.eq("institute_id", institute_id) if not include_archived: q = q.neq("status", "archived") res = q.order("updated_at", desc=True).execute() return {"templates": _rows(res)} @router.get("/templates/{template_id}") async def get_template( template_id: str, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: template = _fetch_template_or_404(ctx, template_id) questions = _rows( ctx.supabase.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute() ) response_areas = _rows( ctx.supabase.table("exam_response_areas").select("*").eq("template_id", template_id).execute() ) boundaries = _rows( ctx.supabase.table("exam_boundaries").select("*").eq("template_id", template_id).execute() ) layout = _rows( ctx.supabase.table("exam_template_layout") .select("*") .eq("template_id", template_id) .order("page_index") .execute() ) return { **template, "questions": questions, "response_areas": response_areas, "boundaries": boundaries, "layout": layout, } @router.get("/templates/{template_id}/source-pdf") async def get_template_source_pdf( template_id: str, ctx: ExamContext = Depends(get_exam_context), ) -> Response: template = _fetch_template_or_404(ctx, template_id) _require_source_visibility_or_404(ctx, template) bucket: Optional[str] = None path: Optional[str] = None if template.get("exam_id"): storage_loc = _lookup_exam_storage_loc(template["exam_id"]) if not storage_loc: raise HTTPException(status_code=404, detail="Template source not found") try: bucket, path = _parse_storage_loc(storage_loc) except ValueError: raise HTTPException(status_code=404, detail="Template source not found") elif template.get("source_file_id"): # Resolve the file row via service role (authz already done above: the caller proved they # can see this template, and source_file_id is the template's own file). Reading `files` # as-the-user trips a pre-existing broken RLS policy on cabinet_memberships # (42P17 infinite recursion) — documented service-role exception, like the catalogue lookup. file_row = _first( SupabaseServiceRoleClient().supabase.table("files") .select("bucket, path, mime_type, name") .eq("id", template["source_file_id"]) .limit(1) .execute() ) if not file_row or not file_row.get("bucket") or not file_row.get("path"): raise HTTPException(status_code=404, detail="Template source not found") bucket = file_row["bucket"] path = file_row["path"] else: raise HTTPException(status_code=404, detail="Template source not found") if not bucket or not path: raise HTTPException(status_code=404, detail="Template source not found") try: pdf_bytes = StorageAdmin().download_file(bucket, path) except Exception as exc: logger.warning(f"Template source download failed for template {template_id}: {exc}") raise HTTPException(status_code=404, detail="Template source not found") return Response(content=pdf_bytes, media_type="application/pdf") @router.put("/templates/{template_id}") async def replace_template( template_id: str, body: TemplateReplaceRequest, background_tasks: BackgroundTasks, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: """Full-replace canvas save (R5.2). Replaces questions/response_areas/boundaries wholesale. Note: the delete-then-insert spans several PostgREST calls and is therefore not atomic; acceptable for the small (~20-question) payloads this carries. A transactional RPC is a later hardening step if concurrent canvas saves become a concern. """ template = _fetch_template_or_404(ctx, template_id) _require_owner(ctx, template) # Data-loss guard: the wholesale question delete below cascades to mark_entries # (mark_entries.question_id → exam_questions ON DELETE CASCADE). Refuse a structural # full-replace once any marks have been recorded against this template's batches, so # re-saving the setup canvas mid-marking can't silently wipe a teacher's marking work. # (Mark-scheme tweaks use PATCH /questions/{id}, which is unaffected.) if _template_has_recorded_marks(ctx, template_id): raise HTTPException( status_code=409, detail="Template has recorded marks; structural full-replace is blocked. " "Edit questions individually via PATCH /questions/{id}.", ) # Optional template-level metadata update alongside the canvas. if body.meta: updates = _template_meta_updates(body.meta) if updates: ctx.supabase.table("exam_templates").update(updates).eq("id", template_id).execute() sb = ctx.supabase # Clear existing children. Order matters: response_areas/boundaries reference questions, so # remove them first (we delete by template_id rather than rely on cascade for predictability). sb.table("exam_response_areas").delete().eq("template_id", template_id).execute() sb.table("exam_boundaries").delete().eq("template_id", template_id).execute() sb.table("exam_template_layout").delete().eq("template_id", template_id).execute() sb.table("exam_questions").delete().eq("template_id", template_id).execute() # Re-insert, preserving client-supplied UUIDs (Neo4j join keys, spec §2). if body.questions: q_rows = [] for q in body.questions: r = { "template_id": template_id, "parent_id": q.parent_id, "label": q.label, "order": q.order, "max_marks": q.max_marks, "answer_type": q.answer_type, "mcq_options": q.mcq_options, "mark_scheme": q.mark_scheme, "is_container": q.is_container, "spec_ref": q.spec_ref, "bounds": q.bounds, # drawn Part box (73); null for derived main questions "page": q.page, "source": q.source, "confirmed": q.confirmed, "confidence": q.confidence, "derivation": q.derivation, } if q.id: r["id"] = q.id q_rows.append({k: v for k, v in r.items() if v is not None}) sb.table("exam_questions").insert(q_rows).execute() if body.response_areas: ra_rows = [] for ra in body.response_areas: r = { "template_id": template_id, "question_id": ra.question_id, "page": ra.page, "bounds": ra.bounds, "kind": ra.kind, "response_form": ra.response_form, "context_type": ra.context_type, # 73: optional Context differentiation "source": ra.source, "confirmed": ra.confirmed, "confidence": ra.confidence, "mark_subtype": ra.mark_subtype, "derivation": ra.derivation, } if ra.id: r["id"] = ra.id ra_rows.append({k: v for k, v in r.items() if v is not None}) sb.table("exam_response_areas").insert(ra_rows).execute() if body.boundaries: b_rows = [] for b in body.boundaries: r = { "template_id": template_id, "question_id": b.question_id, "label": b.label, "page_index": b.page_index, "y": b.y, "bounds": b.bounds, "source": b.source, "confirmed": b.confirmed, "confidence": b.confidence, "derivation": b.derivation, } if b.id: r["id"] = b.id b_rows.append({k: v for k, v in r.items() if v is not None}) sb.table("exam_boundaries").insert(b_rows).execute() if body.layout: layout_rows = [] for item in body.layout: r = { "template_id": template_id, "page_index": item.page_index, "role": item.role, "margin_left": item.margin_left, "margin_right": item.margin_right, "margin_top": item.margin_top, "margin_bottom": item.margin_bottom, "margins_enabled": item.margins_enabled, "source": item.source, "confirmed": item.confirmed, "confidence": item.confidence, "derivation": item.derivation, "meta": item.meta, } if item.id: r["id"] = item.id layout_rows.append({k: v for k, v in r.items() if v is not None}) sb.table("exam_template_layout").insert(layout_rows).execute() logger.info( f"Exam template {template_id} replaced: {len(body.questions)} questions, " f"{len(body.response_areas)} regions, {len(body.boundaries)} boundaries, " f"{len(body.layout)} layout rows" ) # R3.5.4: a successful save enqueues a graph projection into cc.public.exams. BackgroundTasks # is acceptable for Sprint 4 (durability via a real queue is a later step); failures are # swallowed so the canvas save itself never fails on a graph hiccup. background_tasks.add_task(project_template_safe, template_id) return await get_template(template_id, ctx) @router.patch("/templates/{template_id}") async def patch_template_meta( template_id: str, body: UpdateTemplateMetaRequest, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: """Metadata-only template update. Unlike PUT /templates/{id}, this never deletes/re-inserts questions, response areas or boundaries, so it is safe after marking has started. RLS scopes the initial read and owner-only writes are enforced explicitly like the structural save path. """ updates = _template_meta_updates(body, include_explicit_nulls=True) if not updates: raise HTTPException(status_code=400, detail="No fields to update") template = _fetch_template_or_404(ctx, template_id) _require_owner(ctx, template) res = ctx.supabase.table("exam_templates").update(updates).eq("id", template_id).execute() updated = _first(res) if not updated: raise HTTPException(status_code=404, detail="Template not found") return updated @router.delete("/templates/{template_id}") async def archive_template( template_id: str, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: """Soft-delete: status='archived' (R5.2). Never hard-deletes a teacher's work.""" template = _fetch_template_or_404(ctx, template_id) _require_owner(ctx, template) ctx.supabase.table("exam_templates").update({"status": "archived"}).eq("id", template_id).execute() return {"status": "archived", "id": template_id} @router.post("/templates/{template_id}/neo4j-sync") async def neo4j_sync( template_id: str, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: """Manual graph-projection trigger (R5.3) for dev/backfill — runs synchronously and returns counts. Auth/ownership is checked as-the-user; the projection itself uses service role (R3.5.1, the documented graph-writer path).""" template = _fetch_template_or_404(ctx, template_id) _require_owner(ctx, template) try: counts = project_template(template_id) except Exception as exc: logger.error(f"Manual neo4j-sync failed for template {template_id}: {exc}") raise HTTPException(status_code=502, detail=f"Projection failed: {exc}") return {"status": "ok", "projection": counts} # ─── questions (granular edit path, R5.2) ──────────────────────────────────── @router.patch("/questions/{question_id}") async def patch_question( question_id: str, body: PatchQuestionRequest, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: updates = {k: v for k, v in _model_dump(body).items() if v is not None} if not updates: raise HTTPException(status_code=400, detail="No fields to update") # RLS (exam_questions_all) enforces that the question belongs to a template owned by the # caller; an out-of-scope id updates zero rows → 404, so no explicit pre-fetch is needed. res = ctx.supabase.table("exam_questions").update(updates).eq("id", question_id).execute() updated = _first(res) if not updated: raise HTTPException(status_code=404, detail="Question not found") return updated