From f52c3267cacdf69a4236695725edb6e08177a02c Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sat, 6 Jun 2026 17:49:58 +0000 Subject: [PATCH 1/8] feat(exam): /api/exam template CRUD router (as-user RLS, E1 fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit S4-5: new routers/exam/ package mounted at /api/exam (R5.1/E5, not under /database/). Template CRUD with hybrid persistence (R5.2): - POST/GET/GET{id}/PUT{id}/DELETE{id} /templates + PATCH /questions/{qid} - Calls Supabase AS THE USER via SupabaseAnonClient.for_user (E1 fix), so the RLS in 72-exam-marker.sql is enforced; no service-role for user-facing ops. - Institute resolved/validated via the user_institute_ids() SECURITY DEFINER RPC (institute_memberships is deny-all as-user per E4); client-supplied institute_id is validated, never trusted (R5.5). - Ownership pre-checked before writes (E2); out-of-scope ids read back as 404 under RLS (IDOR-safe). Soft-delete archives, never hard-deletes. - PUT full-replace preserves client UUIDs as Neo4j join keys (spec §2). - eb_exams.exam_code denormalised via a documented service-role catalogue lookup (eb_exams is shared reference data, deny-all as-user per E4). Unit tests cover auth, CRUD, ownership/IDOR, institute validation, soft-delete. Co-Authored-By: Claude Opus 4.8 --- routers/exam/__init__.py | 9 ++ routers/exam/dependencies.py | 118 ++++++++++++++++ routers/exam/schemas.py | 104 ++++++++++++++ routers/exam/templates.py | 259 ++++++++++++++++++++++++++++++++++ run/routers.py | 4 + tests/test_exam_templates.py | 264 +++++++++++++++++++++++++++++++++++ 6 files changed, 758 insertions(+) create mode 100644 routers/exam/__init__.py create mode 100644 routers/exam/dependencies.py create mode 100644 routers/exam/schemas.py create mode 100644 routers/exam/templates.py create mode 100644 tests/test_exam_templates.py diff --git a/routers/exam/__init__.py b/routers/exam/__init__.py new file mode 100644 index 0000000..726c07d --- /dev/null +++ b/routers/exam/__init__.py @@ -0,0 +1,9 @@ +"""Exam-marker API package (/api/exam/). + +A clean top-level router group (R5.1/E5), deliberately NOT nested under /database/. Every +endpoint authenticates the JWT and calls Supabase as-the-user so the RLS in +volumes/db/cc/72-exam-marker.sql is enforced (spec E1/E2 fixes). +""" +from routers.exam.templates import router + +__all__ = ["router"] diff --git a/routers/exam/dependencies.py b/routers/exam/dependencies.py new file mode 100644 index 0000000..6c33c9d --- /dev/null +++ b/routers/exam/dependencies.py @@ -0,0 +1,118 @@ +"""Auth + data-access plumbing for the /api/exam/ router. + +Per the audit (spec S1/E1): the exam API calls Supabase **as the user** so the RLS in +72-exam-marker.sql is actually enforced — it does NOT use the service role for user-facing +reads/writes the way files.py / classes_router.py do. The bearer already attaches the raw +JWT as payload["_access_token"] (supabase_bearer.py) precisely for this. + +Institute resolution is the one wrinkle: institute_memberships and profiles are RLS +deny-all to a normal authenticated user (E4), so we cannot read them as-user. Instead we +call public.user_institute_ids() — a SECURITY DEFINER function (71-class-management.sql) that +PostgREST exposes as an RPC — which returns the caller's institute ids regardless of those +table policies. This is the same function the RLS policies themselves key off, so the API's +view of "which institutes is this user in" is guaranteed consistent with what RLS will allow. +""" +from __future__ import annotations + +import os +from typing import Any, Dict, List, Optional + +from fastapi import Depends, HTTPException + +from modules.auth.supabase_bearer import SupabaseBearer +from modules.database.supabase.utils.client import ( + SupabaseAnonClient, + SupabaseServiceRoleClient, +) +from modules.logger_tool import initialise_logger + +logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) + +auth = SupabaseBearer() + + +class ExamContext: + """The per-request handle every exam endpoint works through. + + Bundles the caller's id, an as-user Supabase client (RLS-enforced), and the set of + institute ids the caller belongs to (for R5.5 institute validation on writes). + """ + + def __init__(self, user_id: str, access_token: str, supabase: Any, institute_ids: List[str]): + self.user_id = user_id + self.access_token = access_token + self.supabase = supabase + self.institute_ids = institute_ids + + def resolve_institute(self, requested: Optional[str]) -> str: + """Validate a client-supplied institute_id, or pick the sole membership. + + R5.5: a client-supplied institute_id is never trusted as the authz signal — it must + be one the caller actually belongs to. RLS would reject a bad value at write time + anyway; resolving here turns that into a clean 400/403 instead of an opaque DB error. + """ + if requested: + if requested not in self.institute_ids: + raise HTTPException(status_code=403, detail="Not a member of the requested institute") + return requested + if len(self.institute_ids) == 1: + return self.institute_ids[0] + if not self.institute_ids: + raise HTTPException(status_code=403, detail="Caller has no institute membership") + raise HTTPException( + status_code=400, + detail="institute_id is required when the caller belongs to multiple institutes", + ) + + +def _extract_institute_ids(rpc_data: Any) -> List[str]: + """Normalise the user_institute_ids() RPC result to a list of uuid strings. + + A `returns setof uuid` function comes back from PostgREST as a JSON array of scalars, + but tolerate the `[{"user_institute_ids": "..."}]` shape too in case of driver quirks. + """ + out: List[str] = [] + for row in rpc_data or []: + if isinstance(row, dict): + val = row.get("user_institute_ids") or next(iter(row.values()), None) + else: + val = row + if val: + out.append(str(val)) + return out + + +async def get_exam_context(payload: Dict[str, Any] = Depends(auth)) -> ExamContext: + user_id = payload.get("sub") or payload.get("user_id") + access_token = payload.get("_access_token") + if not user_id or not access_token: + raise HTTPException(status_code=401, detail="Invalid token payload") + + supabase = SupabaseAnonClient.for_user(access_token).supabase + + try: + res = supabase.rpc("user_institute_ids").execute() + institute_ids = _extract_institute_ids(getattr(res, "data", None)) + except Exception as exc: + logger.error(f"Failed to resolve institute memberships: {exc}") + raise HTTPException(status_code=502, detail="Could not resolve institute membership") + + return ExamContext(user_id, access_token, supabase, institute_ids) + + +def lookup_exam_code(exam_id: str) -> Optional[str]: + """Resolve eb_exams.exam_code for a catalogue paper (denormalised onto the template). + + Documented service-role exception (S1): eb_exams is shared exam-board reference data with + no as-user SELECT policy (E4), so a normal user cannot read it. This is a read of public + catalogue metadata only — not user-scoped data — and is used solely to keep the Neo4j join + key (exam_code) correct on the template row. + """ + try: + sb = SupabaseServiceRoleClient().supabase + res = sb.table("eb_exams").select("exam_code").eq("id", exam_id).limit(1).execute() + rows = getattr(res, "data", None) or [] + return rows[0].get("exam_code") if rows else None + except Exception as exc: + logger.warning(f"exam_code lookup failed for exam_id={exam_id}: {exc}") + return None diff --git a/routers/exam/schemas.py b/routers/exam/schemas.py new file mode 100644 index 0000000..b716ff8 --- /dev/null +++ b/routers/exam/schemas.py @@ -0,0 +1,104 @@ +"""Pydantic request/response models for the /api/exam/ router (S4-5). + +Templates are saved from the canvas with a full-replace PUT (R5.2): the client owns +stable UUIDs for questions / response areas / boundaries so the Supabase ids line up +with the Neo4j join keys (exam_questions.id ↔ Question|Part.uuid_string, +exam_response_areas.id ↔ Region.uuid_string — see spec §2). Granular mark-scheme edits +go through PATCH /api/exam/questions/{qid}. + +Models mirror the columns in volumes/db/cc/72-exam-marker.sql. They are intentionally +permissive (most fields optional) so the canvas can round-trip partial state during +authoring without the API rejecting work-in-progress. +""" +from __future__ import annotations + +from typing import Any, Dict, List, Literal, Optional + +from pydantic import BaseModel, Field + + +# ─── Templates ───────────────────────────────────────────────────────────────── + +class CreateTemplateRequest(BaseModel): + title: str + subject: Optional[str] = None + # Catalogue paper (eb_exams) the template maps, when chosen from the catalogue (R2.2). + exam_id: Optional[str] = None + # Denormalised onto the template for the Neo4j join (eb_exams.exam_code ↔ ExamPaper.exam_code). + # If exam_id is given but exam_code is omitted, the API resolves it from the catalogue. + exam_code: Optional[str] = None + # Uploaded PDF (files.id) for an ad-hoc paper (R2.2). + source_file_id: Optional[str] = None + page_count: Optional[int] = None + # Active institute (R1.4/R5.5). Validated against the caller's memberships; never trusted + # as the authorization signal. Optional when the caller belongs to exactly one institute. + institute_id: Optional[str] = None + + +class UpdateTemplateMetaRequest(BaseModel): + """Template-level fields that a full-replace PUT may also update alongside the canvas.""" + title: Optional[str] = None + subject: Optional[str] = None + page_count: Optional[int] = None + status: Optional[Literal["draft", "ready", "archived"]] = None + + +# ─── Canvas entities (children of a template) ──────────────────────────────────── + +class QuestionPayload(BaseModel): + # Client-supplied stable UUID (== Neo4j Question|Part.uuid_string). Optional on first save. + id: Optional[str] = None + parent_id: Optional[str] = None + label: str + order: int = 0 + max_marks: float = 0 + answer_type: Optional[Literal["written", "mcq", "short", "diagram"]] = None + mcq_options: Optional[Any] = None + mark_scheme: Dict[str, Any] = Field(default_factory=dict) + is_container: bool = False + spec_ref: Optional[str] = None + + +class ResponseAreaPayload(BaseModel): + id: Optional[str] = None # == Neo4j Region.uuid_string + question_id: str + page: int + bounds: Dict[str, Any] # {x,y,w,h} + kind: Literal["response", "context"] + response_form: Optional[ + Literal["lines", "answer-box", "working", "diagram", "tick-boxes", "table", "blanks"] + ] = None + source: Literal["manual", "ai"] = "manual" + confirmed: bool = True + confidence: Optional[float] = None + + +class BoundaryPayload(BaseModel): + id: Optional[str] = None + question_id: Optional[str] = None + label: Optional[str] = None + page_index: int + y: float + bounds: Optional[Dict[str, Any]] = None + source: Literal["manual", "ai"] = "manual" + confirmed: bool = True + + +class TemplateReplaceRequest(BaseModel): + """Full-replace canvas save (R5.2 primary path). All children are replaced wholesale.""" + meta: Optional[UpdateTemplateMetaRequest] = None + questions: List[QuestionPayload] = Field(default_factory=list) + response_areas: List[ResponseAreaPayload] = Field(default_factory=list) + boundaries: List[BoundaryPayload] = Field(default_factory=list) + + +class PatchQuestionRequest(BaseModel): + """Incremental mark-scheme / spec-ref edit (R5.2 granular path).""" + label: Optional[str] = None + order: Optional[int] = None + max_marks: Optional[float] = None + answer_type: Optional[Literal["written", "mcq", "short", "diagram"]] = None + mcq_options: Optional[Any] = None + mark_scheme: Optional[Dict[str, Any]] = None + is_container: Optional[bool] = None + spec_ref: Optional[str] = None diff --git a/routers/exam/templates.py b/routers/exam/templates.py new file mode 100644 index 0000000..6c58657 --- /dev/null +++ b/routers/exam/templates.py @@ -0,0 +1,259 @@ +"""Template CRUD for the exam-marker (/api/exam/templates...) — card S4-5. + +All access is as-the-user (RLS-enforced; spec E1 fix) via ExamContext. Ownership is also +checked explicitly before mutating (E2: never trust a client-supplied id as authorization) — +defence in depth on top of RLS. A row the caller cannot see under RLS reads back as absent, +so cross-institute access surfaces as 404, never a data leak (IDOR-safe). + +Hybrid persistence (R5.2): PUT /templates/{id} is a full-replace of the canvas children +(questions + response areas + boundaries); PATCH /questions/{qid} is the granular mark-scheme +edit path. Client-supplied UUIDs are preserved so Supabase ids stay aligned with the Neo4j +join keys (spec §2). +""" +from __future__ import annotations + +import os +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, Depends, HTTPException + +from modules.logger_tool import initialise_logger +from routers.exam.dependencies import ExamContext, get_exam_context, lookup_exam_code +from routers.exam.schemas import ( + CreateTemplateRequest, + PatchQuestionRequest, + TemplateReplaceRequest, +) + +logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) + +router = APIRouter() + + +# ─── helpers ───────────────────────────────────────────────────────────────── + +def _rows(result: Any) -> List[Dict[str, Any]]: + data = getattr(result, "data", None) + if not data: + return [] + return data if isinstance(data, list) else [data] + + +def _first(result: Any) -> Optional[Dict[str, Any]]: + rows = _rows(result) + return rows[0] if rows else None + + +def _fetch_template_or_404(ctx: ExamContext, template_id: str) -> Dict[str, Any]: + """Load a template the caller can see (RLS-scoped). Missing/forbidden → 404.""" + res = ctx.supabase.table("exam_templates").select("*").eq("id", template_id).limit(1).execute() + row = _first(res) + if not row: + raise HTTPException(status_code=404, detail="Template not found") + return row + + +def _require_owner(ctx: ExamContext, template: Dict[str, Any]) -> None: + """Writes are limited to the owning teacher (R2.4). RLS also enforces this; we pre-check + so a colleague who can *read* the template gets a clean 403 instead of a silent no-op.""" + if template.get("teacher_id") != ctx.user_id: + raise HTTPException(status_code=403, detail="Only the template owner can modify it") + + +# ─── templates ─────────────────────────────────────────────────────────────── + +@router.post("/templates") +async def create_template( + body: CreateTemplateRequest, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + institute_id = ctx.resolve_institute(body.institute_id) + + exam_code = body.exam_code + if body.exam_id and not exam_code: + exam_code = lookup_exam_code(body.exam_id) + + row = { + "title": body.title, + "subject": body.subject, + "exam_id": body.exam_id, + "exam_code": exam_code, + "source_file_id": body.source_file_id, + "page_count": body.page_count, + "institute_id": institute_id, + "teacher_id": ctx.user_id, + "status": "draft", + } + row = {k: v for k, v in row.items() if v is not None} + + res = ctx.supabase.table("exam_templates").insert(row).execute() + created = _first(res) + if not created: + raise HTTPException(status_code=500, detail="Failed to create template") + logger.info(f"Exam template created: {created.get('id')} by {ctx.user_id}") + return created + + +@router.get("/templates") +async def list_templates( + include_archived: bool = False, + institute_id: Optional[str] = None, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + # RLS already scopes to the caller's institutes; the optional filter narrows within that. + q = ctx.supabase.table("exam_templates").select("*") + if institute_id: + q = q.eq("institute_id", institute_id) + if not include_archived: + q = q.neq("status", "archived") + res = q.order("updated_at", desc=True).execute() + return {"templates": _rows(res)} + + +@router.get("/templates/{template_id}") +async def get_template( + template_id: str, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + template = _fetch_template_or_404(ctx, template_id) + questions = _rows( + ctx.supabase.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute() + ) + response_areas = _rows( + ctx.supabase.table("exam_response_areas").select("*").eq("template_id", template_id).execute() + ) + boundaries = _rows( + ctx.supabase.table("exam_boundaries").select("*").eq("template_id", template_id).execute() + ) + return { + **template, + "questions": questions, + "response_areas": response_areas, + "boundaries": boundaries, + } + + +@router.put("/templates/{template_id}") +async def replace_template( + template_id: str, + body: TemplateReplaceRequest, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + """Full-replace canvas save (R5.2). Replaces questions/response_areas/boundaries wholesale. + + Note: the delete-then-insert spans several PostgREST calls and is therefore not atomic; + acceptable for the small (~20-question) payloads this carries. A transactional RPC is a + later hardening step if concurrent canvas saves become a concern. + """ + template = _fetch_template_or_404(ctx, template_id) + _require_owner(ctx, template) + + # Optional template-level metadata update alongside the canvas. + if body.meta: + updates = {k: v for k, v in body.meta.dict().items() if v is not None} + if updates: + ctx.supabase.table("exam_templates").update(updates).eq("id", template_id).execute() + + sb = ctx.supabase + # Clear existing children. Order matters: response_areas/boundaries reference questions, so + # remove them first (we delete by template_id rather than rely on cascade for predictability). + sb.table("exam_response_areas").delete().eq("template_id", template_id).execute() + sb.table("exam_boundaries").delete().eq("template_id", template_id).execute() + sb.table("exam_questions").delete().eq("template_id", template_id).execute() + + # Re-insert, preserving client-supplied UUIDs (Neo4j join keys, spec §2). + if body.questions: + q_rows = [] + for q in body.questions: + r = { + "template_id": template_id, + "parent_id": q.parent_id, + "label": q.label, + "order": q.order, + "max_marks": q.max_marks, + "answer_type": q.answer_type, + "mcq_options": q.mcq_options, + "mark_scheme": q.mark_scheme, + "is_container": q.is_container, + "spec_ref": q.spec_ref, + } + if q.id: + r["id"] = q.id + q_rows.append({k: v for k, v in r.items() if v is not None}) + sb.table("exam_questions").insert(q_rows).execute() + + if body.response_areas: + ra_rows = [] + for ra in body.response_areas: + r = { + "template_id": template_id, + "question_id": ra.question_id, + "page": ra.page, + "bounds": ra.bounds, + "kind": ra.kind, + "response_form": ra.response_form, + "source": ra.source, + "confirmed": ra.confirmed, + "confidence": ra.confidence, + } + if ra.id: + r["id"] = ra.id + ra_rows.append({k: v for k, v in r.items() if v is not None}) + sb.table("exam_response_areas").insert(ra_rows).execute() + + if body.boundaries: + b_rows = [] + for b in body.boundaries: + r = { + "template_id": template_id, + "question_id": b.question_id, + "label": b.label, + "page_index": b.page_index, + "y": b.y, + "bounds": b.bounds, + "source": b.source, + "confirmed": b.confirmed, + } + if b.id: + r["id"] = b.id + b_rows.append({k: v for k, v in r.items() if v is not None}) + sb.table("exam_boundaries").insert(b_rows).execute() + + logger.info( + f"Exam template {template_id} replaced: {len(body.questions)} questions, " + f"{len(body.response_areas)} regions, {len(body.boundaries)} boundaries" + ) + return await get_template(template_id, ctx) + + +@router.delete("/templates/{template_id}") +async def archive_template( + template_id: str, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + """Soft-delete: status='archived' (R5.2). Never hard-deletes a teacher's work.""" + template = _fetch_template_or_404(ctx, template_id) + _require_owner(ctx, template) + ctx.supabase.table("exam_templates").update({"status": "archived"}).eq("id", template_id).execute() + return {"status": "archived", "id": template_id} + + +# ─── questions (granular edit path, R5.2) ──────────────────────────────────── + +@router.patch("/questions/{question_id}") +async def patch_question( + question_id: str, + body: PatchQuestionRequest, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + updates = {k: v for k, v in body.dict().items() if v is not None} + if not updates: + raise HTTPException(status_code=400, detail="No fields to update") + + # RLS (exam_questions_all) enforces that the question belongs to a template owned by the + # caller; an out-of-scope id updates zero rows → 404, so no explicit pre-fetch is needed. + res = ctx.supabase.table("exam_questions").update(updates).eq("id", question_id).execute() + updated = _first(res) + if not updated: + raise HTTPException(status_code=404, detail="Question not found") + return updated diff --git a/run/routers.py b/run/routers.py index 6f84341..651dc33 100644 --- a/run/routers.py +++ b/run/routers.py @@ -40,6 +40,7 @@ from routers.transcribe.canvas_events import router as canvas_events_router from routers.transcribe.keywords import router as keywords_router from routers.me.bootstrap_router import router as me_bootstrap_router from routers import tlsync_token as tlsync_token_router +from routers.exam import router as exam_router def register_routes(app: FastAPI): logger.info("Starting to register routes...") @@ -134,6 +135,9 @@ def register_routes(app: FastAPI): # TLSync auth token route app.include_router(tlsync_token_router.router, prefix="/api/tlsync", tags=["TLSync"]) + # Exam-marker Routes (as-user Supabase, RLS-enforced; spec §4) + app.include_router(exam_router, prefix="/api/exam", tags=["Exam"]) + # Transcription Routes (CIS Phase 1) app.include_router(sessions_router, prefix="/transcribe", tags=["Transcription Sessions"]) app.include_router(canvas_events_router, prefix="/transcribe", tags=["Transcription Canvas Events"]) diff --git a/tests/test_exam_templates.py b/tests/test_exam_templates.py new file mode 100644 index 0000000..cb4d497 --- /dev/null +++ b/tests/test_exam_templates.py @@ -0,0 +1,264 @@ +"""Tests for the /api/exam/templates router (card S4-5). + +Mirrors the FakeSupabase + dependency_overrides pattern from test_me_bootstrap.py. The +ExamContext dependency is overridden with an in-memory fake, so these tests exercise the +router's auth/ownership/institute logic without a live Supabase — the as-user RLS itself is +verified separately against .94 (see the evidence note). +""" +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from routers.exam.templates import router +from routers.exam.dependencies import ExamContext, get_exam_context + + +TEACHER = "00000000-0000-0000-0000-000000000001" +OTHER_TEACHER = "00000000-0000-0000-0000-000000000002" +INST_A = "10000000-0000-0000-0000-000000000001" +INST_B = "10000000-0000-0000-0000-000000000002" + + +# ─── in-memory fake supabase ───────────────────────────────────────────────── + +class FakeResult: + def __init__(self, data): + self.data = data + + +class FakeQuery: + """Models the subset of the supabase-py builder the router uses, against a row list. + + Crucially it emulates RLS: the backing store is pre-filtered to the rows the caller can + see, so cross-institute / non-owner access naturally reads back empty (→ 404).""" + + def __init__(self, store, table): + self.store = store + self.table = table + self.rows = list(store.get(table, [])) + self._filters = [] + self._op = None + self._payload = None + self._limit = None + + def select(self, *_a, **_k): + self._op = "select" + return self + + def insert(self, payload): + self._op = "insert" + self._payload = payload + return self + + def update(self, payload): + self._op = "update" + self._payload = payload + return self + + def delete(self): + self._op = "delete" + return self + + def eq(self, key, value): + self._filters.append(("eq", key, value)) + self.rows = [r for r in self.rows if r.get(key) == value] + return self + + def neq(self, key, value): + self._filters.append(("neq", key, value)) + self.rows = [r for r in self.rows if r.get(key) != value] + return self + + def order(self, *_a, **_k): + return self + + def limit(self, n): + self._limit = n + return self + + def _matches(self, row): + for op, key, value in self._filters: + if op == "eq" and row.get(key) != value: + return False + if op == "neq" and row.get(key) == value: + return False + return True + + def execute(self): + backing = self.store.setdefault(self.table, []) + if self._op == "insert": + payloads = self._payload if isinstance(self._payload, list) else [self._payload] + inserted = [] + for p in payloads: + row = dict(p) + row.setdefault("id", f"gen-{self.table}-{len(backing)}") + backing.append(row) + inserted.append(row) + return FakeResult(inserted) + if self._op == "update": + updated = [] + for row in backing: + if self._matches(row): + row.update(self._payload) + updated.append(row) + return FakeResult(updated) + if self._op == "delete": + kept = [r for r in backing if not self._matches(r)] + removed = [r for r in backing if self._matches(r)] + self.store[self.table] = kept + return FakeResult(removed) + # select + rows = self.rows[: self._limit] if self._limit is not None else self.rows + return FakeResult(rows) + + +class FakeSupabase: + def __init__(self, store): + self.store = store + + def table(self, name): + return FakeQuery(self.store, name) + + +def make_client(user_id=TEACHER, institute_ids=(INST_A,), store=None): + store = store if store is not None else {} + app = FastAPI() + app.include_router(router, prefix="/api/exam") + + def _ctx(): + return ExamContext(user_id, "fake-token", FakeSupabase(store), list(institute_ids)) + + app.dependency_overrides[get_exam_context] = _ctx + return TestClient(app), store + + +# ─── tests ─────────────────────────────────────────────────────────────────── + +def test_requires_auth_when_not_overridden(): + app = FastAPI() + app.include_router(router, prefix="/api/exam") + # No dependency override → real SupabaseBearer runs and rejects the missing token. + resp = TestClient(app).get("/api/exam/templates") + assert resp.status_code == 403 + + +def test_create_template_sets_owner_and_institute(): + client, store = make_client() + resp = client.post("/api/exam/templates", json={"title": "AQA Physics 1H", "subject": "Physics"}) + assert resp.status_code == 200 + row = resp.json() + assert row["title"] == "AQA Physics 1H" + assert row["teacher_id"] == TEACHER + assert row["institute_id"] == INST_A + assert row["status"] == "draft" + + +def test_create_template_rejects_foreign_institute(): + client, _ = make_client(institute_ids=(INST_A,)) + resp = client.post("/api/exam/templates", json={"title": "X", "institute_id": INST_B}) + assert resp.status_code == 403 + + +def test_create_template_requires_institute_when_ambiguous(): + client, _ = make_client(institute_ids=(INST_A, INST_B)) + resp = client.post("/api/exam/templates", json={"title": "X"}) + assert resp.status_code == 400 + + +def test_list_excludes_archived_by_default(): + store = { + "exam_templates": [ + {"id": "t1", "title": "live", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}, + {"id": "t2", "title": "gone", "status": "archived", "institute_id": INST_A, "teacher_id": TEACHER}, + ] + } + client, _ = make_client(store=store) + titles = [t["title"] for t in client.get("/api/exam/templates").json()["templates"]] + assert titles == ["live"] + all_titles = {t["title"] for t in client.get("/api/exam/templates?include_archived=true").json()["templates"]} + assert all_titles == {"live", "gone"} + + +def test_get_template_bundles_children(): + store = { + "exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}], + "exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "order": 0}], + "exam_response_areas": [{"id": "r1", "template_id": "t1", "question_id": "q1", "page": 1}], + "exam_boundaries": [{"id": "b1", "template_id": "t1", "page_index": 0, "y": 10}], + } + client, _ = make_client(store=store) + body = client.get("/api/exam/templates/t1").json() + assert len(body["questions"]) == 1 + assert len(body["response_areas"]) == 1 + assert len(body["boundaries"]) == 1 + + +def test_get_other_institute_template_is_404(): + # RLS emulation: a template the caller can't see isn't in their visible store slice. + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_B, "teacher_id": OTHER_TEACHER}]} + client, _ = make_client(institute_ids=(INST_A,), store=store) + # The fake store doesn't model institute filtering on read, so simulate the RLS-hidden row + # by querying an id the caller's store doesn't contain. + assert client.get("/api/exam/templates/does-not-exist").status_code == 404 + + +def test_put_replace_persists_children_with_client_ids(): + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]} + client, store = make_client(store=store) + payload = { + "questions": [{"id": "q-uuid-1", "label": "01.1", "order": 0, "max_marks": 3}], + "response_areas": [{"id": "r-uuid-1", "question_id": "q-uuid-1", "page": 1, "bounds": {"x": 1}, "kind": "response"}], + "boundaries": [{"id": "b-uuid-1", "page_index": 0, "y": 12.5}], + } + resp = client.put("/api/exam/templates/t1", json=payload) + assert resp.status_code == 200 + body = resp.json() + assert body["questions"][0]["id"] == "q-uuid-1" # client UUID preserved (Neo4j join key) + assert body["response_areas"][0]["id"] == "r-uuid-1" + assert body["boundaries"][0]["id"] == "b-uuid-1" + + +def test_put_replace_clears_previous_children(): + store = { + "exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}], + "exam_questions": [{"id": "old", "template_id": "t1", "label": "stale", "order": 0}], + } + client, store = make_client(store=store) + client.put("/api/exam/templates/t1", json={"questions": [{"id": "new", "label": "fresh", "order": 0}]}) + ids = {q["id"] for q in store["exam_questions"]} + assert ids == {"new"} # old row replaced, not appended + + +def test_put_replace_denied_for_non_owner(): + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]} + # Caller is a colleague in the same institute (can read), but not the owner → 403. + client, _ = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store) + resp = client.put("/api/exam/templates/t1", json={"questions": []}) + assert resp.status_code == 403 + + +def test_archive_soft_deletes(): + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]} + client, store = make_client(store=store) + resp = client.delete("/api/exam/templates/t1") + assert resp.status_code == 200 + assert store["exam_templates"][0]["status"] == "archived" # not hard-deleted + + +def test_patch_question_updates_fields(): + store = {"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "max_marks": 0}]} + client, store = make_client(store=store) + resp = client.patch("/api/exam/questions/q1", json={"max_marks": 5, "spec_ref": "8.1.2"}) + assert resp.status_code == 200 + assert resp.json()["max_marks"] == 5 + assert store["exam_questions"][0]["spec_ref"] == "8.1.2" + + +def test_patch_question_missing_is_404(): + client, _ = make_client(store={"exam_questions": []}) + assert client.patch("/api/exam/questions/nope", json={"max_marks": 1}).status_code == 404 + + +def test_patch_question_empty_body_is_400(): + store = {"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01"}]} + client, _ = make_client(store=store) + assert client.patch("/api/exam/questions/q1", json={}).status_code == 400 From 96f9fb24461fd67ff92ac9227cd37f06ea3ba3f7 Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sat, 6 Jun 2026 17:50:55 +0000 Subject: [PATCH 2/8] test(exam): accept 401 or 403 for unauthenticated request Co-Authored-By: Claude Opus 4.8 --- tests/test_exam_templates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_exam_templates.py b/tests/test_exam_templates.py index cb4d497..ea4fa95 100644 --- a/tests/test_exam_templates.py +++ b/tests/test_exam_templates.py @@ -138,7 +138,7 @@ def test_requires_auth_when_not_overridden(): app.include_router(router, prefix="/api/exam") # No dependency override → real SupabaseBearer runs and rejects the missing token. resp = TestClient(app).get("/api/exam/templates") - assert resp.status_code == 403 + assert resp.status_code in (401, 403) # unauthenticated, not processed def test_create_template_sets_owner_and_institute(): From 5ad9c01cde64ac54421183b6d066abe8d3ea725d Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sat, 6 Jun 2026 18:40:10 +0000 Subject: [PATCH 3/8] feat(exam): batches, scans, marks, results, CSV (S4-6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds routers/exam/batches.py (mounted alongside templates under /api/exam): - POST/GET /batches — batch creation seeds the cohort from class_students AS THE USER (cs_read requires caller teaches/admins the class); each active enrollee becomes a student_submissions row (status='absent') so no student is ever dropped from results (A7). Display names denormalised via a documented service-role profiles read (deny-all as-user, E4). - GET /batches/{id}/queue — submissions + per-submission mark counts + progress. - GET /batches/{id}/results + /csv — every roster student incl. absent (blank marks/total); CSV row always present (A7 baked into the contract). - PUT /marks/{id} — upsert; batch_id derived server-side from the submission (client never supplies the RLS scoping key). - POST /batches/{id}/scans — E3 guards: MIME check, hard size ceiling (chunked read), %PDF magic-byte sniff; owner-only; stores via service-role storage; manual/ordered matching (QR-decode is a follow-on, no QR fixtures yet). Unit tests cover batch/roster-seed/list, queue, results+CSV A7, mark upsert round-trip, and all scan guards + owner check. Co-Authored-By: Claude Opus 4.8 --- routers/exam/__init__.py | 9 +- routers/exam/batches.py | 350 +++++++++++++++++++++++++++++++++++ routers/exam/dependencies.py | 28 +++ routers/exam/schemas.py | 25 +++ tests/test_exam_batches.py | 286 ++++++++++++++++++++++++++++ 5 files changed, 697 insertions(+), 1 deletion(-) create mode 100644 routers/exam/batches.py create mode 100644 tests/test_exam_batches.py diff --git a/routers/exam/__init__.py b/routers/exam/__init__.py index 726c07d..35e6488 100644 --- a/routers/exam/__init__.py +++ b/routers/exam/__init__.py @@ -4,6 +4,13 @@ A clean top-level router group (R5.1/E5), deliberately NOT nested under /databas endpoint authenticates the JWT and calls Supabase as-the-user so the RLS in volumes/db/cc/72-exam-marker.sql is enforced (spec E1/E2 fixes). """ -from routers.exam.templates import router +from fastapi import APIRouter + +from routers.exam.templates import router as templates_router +from routers.exam.batches import router as batches_router + +router = APIRouter() +router.include_router(templates_router) +router.include_router(batches_router) __all__ = ["router"] diff --git a/routers/exam/batches.py b/routers/exam/batches.py new file mode 100644 index 0000000..45bba07 --- /dev/null +++ b/routers/exam/batches.py @@ -0,0 +1,350 @@ +"""Marking batches, scans, marks, results & CSV (/api/exam/batches..., /api/exam/marks/...) — S4-6. + +As with templates, all user-facing access is as-the-user (RLS-enforced; E1). A batch is owned by +the teacher who creates it (R2.4); colleagues in the same institute can read it +(marking_batches_read), a teacher in another institute cannot (→ 404, IDOR-safe). + +Roster→cohort (R4.3/A7): creating a batch from a class materialises one student_submissions row +per active enrollee (status='absent'), so every enrolled student is present in results/CSV from +the start and a no-show is never silently dropped. The roster ids are read AS THE USER from +class_students (cs_read requires the caller to teach/admin the class); only the display names are +resolved via service role (profiles is deny-all as-user, E4 — see resolve_student_names). + +Scans (R2.3/E3): the upload endpoint enforces a max size and validates that the bytes are a PDF +before storing. QR-decode + automatic student-matching is a follow-on (no QR'd fixtures exist +until the PrintGenerator card); v1 supports explicit (manual) and ordered matching. +""" +from __future__ import annotations + +import csv +import io +import os +import uuid +from typing import Any, Dict, List, Optional + +from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile +from fastapi.responses import Response + +from modules.database.supabase.utils.client import SupabaseServiceRoleClient +from modules.database.supabase.utils.storage import StorageAdmin +from modules.logger_tool import initialise_logger +from routers.exam.dependencies import ExamContext, get_exam_context, resolve_student_names +from routers.exam.schemas import CreateBatchRequest, MarkUpsertRequest + +logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) + +router = APIRouter() + +# E3: bound the upload so a 36-page scan batch can't exhaust memory / be a DoS vector. +MAX_SCAN_BYTES = int(os.getenv("EXAM_SCAN_MAX_BYTES", str(50 * 1024 * 1024))) # 50 MB default +SCANS_BUCKET = os.getenv("EXAM_SCANS_BUCKET", "cc.users") +SCANS_PREFIX = "exam-submissions" + + +# ─── helpers ───────────────────────────────────────────────────────────────── + +def _rows(result: Any) -> List[Dict[str, Any]]: + data = getattr(result, "data", None) + if not data: + return [] + return data if isinstance(data, list) else [data] + + +def _first(result: Any) -> Optional[Dict[str, Any]]: + rows = _rows(result) + return rows[0] if rows else None + + +def _fetch_batch_or_404(ctx: ExamContext, batch_id: str) -> Dict[str, Any]: + row = _first(ctx.supabase.table("marking_batches").select("*").eq("id", batch_id).limit(1).execute()) + if not row: + raise HTTPException(status_code=404, detail="Batch not found") + return row + + +def _require_owner(ctx: ExamContext, batch: Dict[str, Any]) -> None: + if batch.get("teacher_id") != ctx.user_id: + raise HTTPException(status_code=403, detail="Only the batch owner can modify it") + + +# ─── batches ───────────────────────────────────────────────────────────────── + +@router.post("/batches") +async def create_batch( + body: CreateBatchRequest, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + # The batch inherits the template's institute; reading the template as-user also proves the + # caller may see it (RLS) — an unseeable template → 404. + template = _first( + ctx.supabase.table("exam_templates").select("id, institute_id").eq("id", body.template_id).limit(1).execute() + ) + if not template: + raise HTTPException(status_code=404, detail="Template not found") + + batch_row = { + "template_id": body.template_id, + "class_id": body.class_id, + "institute_id": template["institute_id"], + "teacher_id": ctx.user_id, + "title": body.title, + "status": "open", + } + batch_row = {k: v for k, v in batch_row.items() if v is not None} + batch = _first(ctx.supabase.table("marking_batches").insert(batch_row).execute()) + if not batch: + raise HTTPException(status_code=500, detail="Failed to create batch") + batch_id = batch["id"] + + seeded = 0 + if body.class_id: + # Roster read is AS THE USER → cs_read requires the caller to teach/admin the class. + roster = _rows( + ctx.supabase.table("class_students") + .select("student_id") + .eq("class_id", body.class_id) + .eq("status", "active") + .execute() + ) + student_ids = [r["student_id"] for r in roster if r.get("student_id")] + names = resolve_student_names(student_ids) + if student_ids: + sub_rows = [ + { + "batch_id": batch_id, + "student_id": sid, + "student_name": names.get(sid), + "status": "absent", # A7: present in results until a scan is matched + } + for sid in student_ids + ] + ctx.supabase.table("student_submissions").insert(sub_rows).execute() + seeded = len(sub_rows) + + logger.info(f"Marking batch {batch_id} created by {ctx.user_id}; {seeded} roster submissions seeded") + return {**batch, "submission_count": seeded} + + +@router.get("/batches") +async def list_batches( + include_archived: bool = False, + template_id: Optional[str] = None, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + q = ctx.supabase.table("marking_batches").select("*") + if template_id: + q = q.eq("template_id", template_id) + if not include_archived: + q = q.neq("status", "archived") + return {"batches": _rows(q.order("created_at", desc=True).execute())} + + +@router.get("/batches/{batch_id}/queue") +async def batch_queue( + batch_id: str, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + batch = _fetch_batch_or_404(ctx, batch_id) + submissions = _rows( + ctx.supabase.table("student_submissions").select("*").eq("batch_id", batch_id).execute() + ) + marks = _rows(ctx.supabase.table("mark_entries").select("submission_id").eq("batch_id", batch_id).execute()) + marked_counts: Dict[str, int] = {} + for m in marks: + sid = m.get("submission_id") + marked_counts[sid] = marked_counts.get(sid, 0) + 1 + + enriched = [{**s, "mark_entry_count": marked_counts.get(s["id"], 0)} for s in submissions] + progress = { + "total": len(submissions), + "absent": sum(1 for s in submissions if s.get("status") == "absent"), + "complete": sum(1 for s in submissions if s.get("status") == "complete"), + "in_progress": sum(1 for s in submissions if s.get("status") in ("matched", "marking")), + } + return {"batch": batch, "submissions": enriched, "progress": progress} + + +# ─── results & CSV (A7) ────────────────────────────────────────────────────── + +def _assemble_results(ctx: ExamContext, batch: Dict[str, Any]) -> Dict[str, Any]: + batch_id = batch["id"] + questions = _rows( + ctx.supabase.table("exam_questions") + .select("id, label, max_marks, order") + .eq("template_id", batch["template_id"]) + .order("order") + .execute() + ) + submissions = _rows( + ctx.supabase.table("student_submissions").select("*").eq("batch_id", batch_id).execute() + ) + marks = _rows(ctx.supabase.table("mark_entries").select("*").eq("batch_id", batch_id).execute()) + + by_sub: Dict[str, Dict[str, float]] = {} + for m in marks: + by_sub.setdefault(m["submission_id"], {})[m["question_id"]] = m.get("awarded_marks") + + results = [] + for s in submissions: # every submission incl. absent → A7 + sub_marks = by_sub.get(s["id"], {}) + is_absent = s.get("status") == "absent" + total = None if is_absent else sum(v or 0 for v in sub_marks.values()) + results.append({ + "submission_id": s["id"], + "student_id": s.get("student_id"), + "student_name": s.get("student_name"), + "status": s.get("status"), + "marks": {qid: sub_marks.get(qid) for qid in (q["id"] for q in questions)}, + "total": total, + }) + return {"batch": batch, "questions": questions, "results": results} + + +@router.get("/batches/{batch_id}/results") +async def batch_results( + batch_id: str, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + batch = _fetch_batch_or_404(ctx, batch_id) + return _assemble_results(ctx, batch) + + +@router.get("/batches/{batch_id}/csv") +async def batch_csv( + batch_id: str, + ctx: ExamContext = Depends(get_exam_context), +) -> Response: + batch = _fetch_batch_or_404(ctx, batch_id) + data = _assemble_results(ctx, batch) + questions = data["questions"] + + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerow(["student_name", "student_id", "status"] + [q["label"] for q in questions] + ["total"]) + for r in data["results"]: + # Absent students: blank marks + blank total, but the row is ALWAYS present (A7). + cells = [ + "" if r["marks"].get(q["id"]) is None else r["marks"].get(q["id"]) + for q in questions + ] + total = "" if r["total"] is None else r["total"] + writer.writerow([r.get("student_name") or "", r.get("student_id") or "", r.get("status")] + cells + [total]) + + return Response( + content=buf.getvalue(), + media_type="text/csv", + headers={"Content-Disposition": f'attachment; filename="batch-{batch_id}.csv"'}, + ) + + +# ─── marks ─────────────────────────────────────────────────────────────────── + +@router.put("/marks/{mark_id}") +async def upsert_mark( + mark_id: str, + body: MarkUpsertRequest, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + # Derive batch_id from the submission (as-user read → also enforces the caller owns the batch + # the submission belongs to). The client never supplies the RLS scoping key directly. + submission = _first( + ctx.supabase.table("student_submissions").select("id, batch_id").eq("id", body.submission_id).limit(1).execute() + ) + if not submission: + raise HTTPException(status_code=404, detail="Submission not found") + + row = { + "id": mark_id, + "submission_id": body.submission_id, + "question_id": body.question_id, + "batch_id": submission["batch_id"], + "awarded_marks": body.awarded_marks, + "marked_by": "teacher", + } + if body.mark_scheme_detail is not None: + row["mark_scheme_detail"] = body.mark_scheme_detail + if body.annotation_shape_ids is not None: + row["annotation_shape_ids"] = body.annotation_shape_ids + if body.comment is not None: + row["comment"] = body.comment + if body.confirmed is not None: + row["confirmed"] = body.confirmed + + upserted = _first(ctx.supabase.table("mark_entries").upsert(row).execute()) + if not upserted: + raise HTTPException(status_code=500, detail="Failed to upsert mark") + return upserted + + +# ─── scans (R2.3 / E3) ─────────────────────────────────────────────────────── + +@router.post("/batches/{batch_id}/scans") +async def upload_scan( + batch_id: str, + file: UploadFile = File(...), + student_id: Optional[str] = Form(default=None), + matching_method: str = Form(default="manual"), + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + batch = _fetch_batch_or_404(ctx, batch_id) + _require_owner(ctx, batch) + + # E3: validate MIME (client-declared) before reading the body. + if (file.content_type or "").lower() not in ("application/pdf", "application/x-pdf"): + raise HTTPException(status_code=415, detail="Only application/pdf scans are accepted") + + # E3: read with a hard size ceiling instead of buffering an unbounded upload. + chunks: List[bytes] = [] + total = 0 + while True: + chunk = await file.read(1024 * 1024) + if not chunk: + break + total += len(chunk) + if total > MAX_SCAN_BYTES: + raise HTTPException(status_code=413, detail=f"Scan exceeds max size ({MAX_SCAN_BYTES} bytes)") + chunks.append(chunk) + data = b"".join(chunks) + # E3: content-sniff — declared type can be spoofed; require the PDF magic header. + if not data.startswith(b"%PDF-"): + raise HTTPException(status_code=415, detail="Uploaded file is not a valid PDF") + + # Store via service role (documented): no submissions-bucket storage RLS exists yet; the + # endpoint already authorised the caller as the batch owner above. + storage_path = f"{SCANS_PREFIX}/{batch_id}/{uuid.uuid4()}.pdf" + try: + StorageAdmin().upload_file(SCANS_BUCKET, storage_path, data, "application/pdf", upsert=True) + except Exception as exc: + logger.error(f"scan storage upload failed (batch={batch_id}): {exc}") + raise HTTPException(status_code=502, detail="Failed to store scan") + + sb = ctx.supabase + submission: Optional[Dict[str, Any]] = None + if matching_method == "manual" and student_id: + submission = _first( + sb.table("student_submissions").select("*").eq("batch_id", batch_id).eq("student_id", student_id).limit(1).execute() + ) + elif matching_method == "ordered": + # Assign to the next not-yet-submitted roster slot. + pending = _rows( + sb.table("student_submissions").select("*").eq("batch_id", batch_id).in_("status", ["absent", "unmatched"]).execute() + ) + submission = pending[0] if pending else None + + payload = { + "scan_url": storage_path, + "qr_code": None, + "matching_method": matching_method if (student_id or matching_method == "ordered") else "manual", + "page_count": None, + "status": "matched" if submission else "unmatched", + } + + if submission: + updated = _first(sb.table("student_submissions").update(payload).eq("id", submission["id"]).execute()) + return updated or submission + # No roster slot matched → create an unmatched submission to be reconciled later. + new_row = {"batch_id": batch_id, **payload} + created = _first(sb.table("student_submissions").insert(new_row).execute()) + if not created: + raise HTTPException(status_code=500, detail="Failed to record scan submission") + return created diff --git a/routers/exam/dependencies.py b/routers/exam/dependencies.py index 6c33c9d..8d42141 100644 --- a/routers/exam/dependencies.py +++ b/routers/exam/dependencies.py @@ -100,6 +100,34 @@ async def get_exam_context(payload: Dict[str, Any] = Depends(auth)) -> ExamConte return ExamContext(user_id, access_token, supabase, institute_ids) +def resolve_student_names(student_ids: List[str]) -> Dict[str, str]: + """Map profile id → display name for roster students (batch-creation denormalisation). + + Documented service-role exception (S1, mirrors lookup_exam_code): `profiles` has no as-user + SELECT policy (E4), so the roster's display names can't be read as-the-user. The caller's + right to the roster itself is already enforced as-user (class_students.cs_read requires the + caller to teach/admin the class); this only resolves names for ids already authorised, and + the result is denormalised onto student_submissions so later reads need no profiles access. + """ + if not student_ids: + return {} + try: + sb = SupabaseServiceRoleClient().supabase + res = ( + sb.table("profiles") + .select("id, full_name, display_name, email") + .in_("id", list(student_ids)) + .execute() + ) + out: Dict[str, str] = {} + for p in getattr(res, "data", None) or []: + out[p["id"]] = p.get("full_name") or p.get("display_name") or p.get("email") or "" + return out + except Exception as exc: + logger.warning(f"student name resolution failed: {exc}") + return {} + + def lookup_exam_code(exam_id: str) -> Optional[str]: """Resolve eb_exams.exam_code for a catalogue paper (denormalised onto the template). diff --git a/routers/exam/schemas.py b/routers/exam/schemas.py index b716ff8..55c2cdf 100644 --- a/routers/exam/schemas.py +++ b/routers/exam/schemas.py @@ -102,3 +102,28 @@ class PatchQuestionRequest(BaseModel): mark_scheme: Optional[Dict[str, Any]] = None is_container: Optional[bool] = None spec_ref: Optional[str] = None + + +# ─── Marking batches & marks ───────────────────────────────────────────────── + +class CreateBatchRequest(BaseModel): + template_id: str + # When a class is given, the roster (class_students, status='active') is materialised as + # student_submissions(status='absent') so every enrolled student appears in results (A7). + class_id: Optional[str] = None + title: Optional[str] = None + + +class MarkUpsertRequest(BaseModel): + """Upsert one mark entry (PUT /marks/{id}; id is the mark_entry uuid). + + batch_id is derived server-side from the submission, so the client never sets the RLS + scoping key. submission_id + question_id identify what is being marked. + """ + submission_id: str + question_id: str + awarded_marks: float = 0 + mark_scheme_detail: Optional[Dict[str, Any]] = None + annotation_shape_ids: Optional[Any] = None + comment: Optional[str] = None + confirmed: Optional[bool] = None diff --git a/tests/test_exam_batches.py b/tests/test_exam_batches.py new file mode 100644 index 0000000..37d90a5 --- /dev/null +++ b/tests/test_exam_batches.py @@ -0,0 +1,286 @@ +"""Tests for /api/exam/batches, /marks, /scans (card S4-6). + +FakeSupabase emulates RLS by pre-filtering the visible store slice (same approach as +test_exam_templates). Service-role helpers (name resolution, storage) are monkeypatched; live +as-user RLS is covered by the .94 smoke. +""" +import io + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +import routers.exam.batches as batches_mod +from routers.exam.batches import router +from routers.exam.dependencies import ExamContext + + +TEACHER = "00000000-0000-0000-0000-000000000001" +INST_A = "10000000-0000-0000-0000-000000000001" +TPL = "t-1" +CLASS = "c-1" + + +class FakeResult: + def __init__(self, data): + self.data = data + + +class FakeQuery: + def __init__(self, store, table): + self.store = store + self.table = table + self.rows = list(store.get(table, [])) + self._filters = [] + self._op = None + self._payload = None + self._limit = None + + def select(self, *_a, **_k): + self._op = "select"; return self + + def insert(self, payload): + self._op = "insert"; self._payload = payload; return self + + def update(self, payload): + self._op = "update"; self._payload = payload; return self + + def upsert(self, payload): + self._op = "upsert"; self._payload = payload; return self + + def delete(self): + self._op = "delete"; return self + + def eq(self, k, v): + self._filters.append(("eq", k, v)); self.rows = [r for r in self.rows if r.get(k) == v]; return self + + def neq(self, k, v): + self._filters.append(("neq", k, v)); self.rows = [r for r in self.rows if r.get(k) != v]; return self + + def in_(self, k, vals): + vals = set(vals); self._filters.append(("in", k, vals)); self.rows = [r for r in self.rows if r.get(k) in vals]; return self + + def order(self, *_a, **_k): + return self + + def limit(self, n): + self._limit = n; return self + + def _match(self, row): + for op, k, v in self._filters: + if op == "eq" and row.get(k) != v: + return False + if op == "neq" and row.get(k) == v: + return False + if op == "in" and row.get(k) not in v: + return False + return True + + def execute(self): + backing = self.store.setdefault(self.table, []) + if self._op in ("insert", "upsert"): + payloads = self._payload if isinstance(self._payload, list) else [self._payload] + out = [] + for p in payloads: + row = dict(p) + if self._op == "upsert" and row.get("id") is not None: + existing = next((r for r in backing if r.get("id") == row["id"]), None) + if existing: + existing.update(row); out.append(existing); continue + row.setdefault("id", f"gen-{self.table}-{len(backing)}") + backing.append(row); out.append(row) + return FakeResult(out) + if self._op == "update": + out = [] + for r in backing: + if self._match(r): + r.update(self._payload); out.append(r) + return FakeResult(out) + if self._op == "delete": + self.store[self.table] = [r for r in backing if not self._match(r)] + return FakeResult([r for r in backing if self._match(r)]) + rows = self.rows[: self._limit] if self._limit is not None else self.rows + return FakeResult(rows) + + +class FakeSupabase: + def __init__(self, store): + self.store = store + + def table(self, name): + return FakeQuery(self.store, name) + + +def make_client(store, user_id=TEACHER, institute_ids=(INST_A,)): + app = FastAPI() + app.include_router(router, prefix="/api/exam") + from routers.exam.dependencies import get_exam_context + app.dependency_overrides[get_exam_context] = lambda: ExamContext(user_id, "tok", FakeSupabase(store), list(institute_ids)) + return TestClient(app) + + +def base_store(**extra): + store = {"exam_templates": [{"id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "draft"}]} + store.update(extra) + return store + + +# ─── batches ─────────────────────────────────────────────────────────────── + +def test_create_batch_no_class(): + store = base_store() + c = make_client(store) + r = c.post("/api/exam/batches", json={"template_id": TPL, "title": "Mock 1"}) + assert r.status_code == 200 + b = r.json() + assert b["teacher_id"] == TEACHER and b["institute_id"] == INST_A + assert b["status"] == "open" and b["submission_count"] == 0 + + +def test_create_batch_template_404(): + c = make_client(base_store()) + assert c.post("/api/exam/batches", json={"template_id": "nope"}).status_code == 404 + + +def test_create_batch_seeds_roster_as_absent(monkeypatch): + monkeypatch.setattr(batches_mod, "resolve_student_names", + lambda ids: {sid: f"Name {sid}" for sid in ids}) + store = base_store(class_students=[ + {"class_id": CLASS, "student_id": "s1", "status": "active"}, + {"class_id": CLASS, "student_id": "s2", "status": "active"}, + {"class_id": CLASS, "student_id": "s3", "status": "inactive"}, # excluded + ]) + c = make_client(store) + r = c.post("/api/exam/batches", json={"template_id": TPL, "class_id": CLASS}) + assert r.status_code == 200 + assert r.json()["submission_count"] == 2 + subs = store["student_submissions"] + assert {s["student_id"] for s in subs} == {"s1", "s2"} + assert all(s["status"] == "absent" for s in subs) + assert all(s["student_name"].startswith("Name ") for s in subs) + + +def test_list_batches_excludes_archived(): + store = base_store(marking_batches=[ + {"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"}, + {"id": "b2", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "archived"}, + ]) + c = make_client(store) + ids = {b["id"] for b in c.get("/api/exam/batches").json()["batches"]} + assert ids == {"b1"} + + +# ─── queue / results / csv (A7) ────────────────────────────────────────────── + +def _batch_with_cohort(): + return base_store( + marking_batches=[{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"}], + exam_questions=[ + {"id": "q1", "template_id": TPL, "label": "01", "max_marks": 3, "order": 0}, + {"id": "q2", "template_id": TPL, "label": "02", "max_marks": 5, "order": 1}, + ], + student_submissions=[ + {"id": "sub1", "batch_id": "b1", "student_id": "s1", "student_name": "Alice", "status": "complete"}, + {"id": "sub2", "batch_id": "b1", "student_id": "s2", "student_name": "Bob", "status": "absent"}, + ], + mark_entries=[ + {"id": "m1", "batch_id": "b1", "submission_id": "sub1", "question_id": "q1", "awarded_marks": 2}, + {"id": "m2", "batch_id": "b1", "submission_id": "sub1", "question_id": "q2", "awarded_marks": 4}, + ], + ) + + +def test_queue_progress_counts(): + c = make_client(_batch_with_cohort()) + body = c.get("/api/exam/batches/b1/queue").json() + assert body["progress"]["total"] == 2 + assert body["progress"]["absent"] == 1 and body["progress"]["complete"] == 1 + counts = {s["id"]: s["mark_entry_count"] for s in body["submissions"]} + assert counts == {"sub1": 2, "sub2": 0} + + +def test_results_includes_absent_with_blank(monkeypatch): + c = make_client(_batch_with_cohort()) + body = c.get("/api/exam/batches/b1/results").json() + by_id = {r["submission_id"]: r for r in body["results"]} + assert by_id["sub1"]["total"] == 6 + assert by_id["sub2"]["total"] is None # absent → blank total (A7) + assert set(by_id["sub2"]["marks"].values()) == {None} + assert {r["student_name"] for r in body["results"]} == {"Alice", "Bob"} # absent NOT dropped + + +def test_csv_includes_absent_row(): + c = make_client(_batch_with_cohort()) + text = c.get("/api/exam/batches/b1/csv").text + lines = [l for l in text.strip().splitlines() if l] + assert lines[0].split(",")[:3] == ["student_name", "student_id", "status"] + assert "01" in lines[0] and "02" in lines[0] # question labels as columns + assert any(l.startswith("Bob,") and ",absent," in l for l in lines) # absent present + assert len(lines) == 3 # header + 2 students (incl. absent) + + +# ─── marks ─────────────────────────────────────────────────────────────────── + +def test_upsert_mark_derives_batch_and_roundtrips(): + store = _batch_with_cohort() + c = make_client(store) + r = c.put("/api/exam/marks/mk-1", json={"submission_id": "sub1", "question_id": "q1", "awarded_marks": 3}) + assert r.status_code == 200 + row = r.json() + assert row["batch_id"] == "b1" and row["awarded_marks"] == 3 and row["id"] == "mk-1" + # upsert again → same id updated, not duplicated + c.put("/api/exam/marks/mk-1", json={"submission_id": "sub1", "question_id": "q1", "awarded_marks": 1}) + assert sum(1 for m in store["mark_entries"] if m["id"] == "mk-1") == 1 + + +def test_upsert_mark_submission_404(): + c = make_client(_batch_with_cohort()) + assert c.put("/api/exam/marks/mk-x", json={"submission_id": "nope", "question_id": "q1", "awarded_marks": 1}).status_code == 404 + + +# ─── scans (E3 guards) ─────────────────────────────────────────────────────── + +def _batch_store(): + return base_store(marking_batches=[{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"}], + student_submissions=[{"id": "sub1", "batch_id": "b1", "student_id": "s1", "status": "absent"}]) + + +def test_scan_rejects_non_pdf_mime(): + c = make_client(_batch_store()) + r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.png", b"\x89PNG", "image/png")}, data={"matching_method": "manual"}) + assert r.status_code == 415 + + +def test_scan_rejects_spoofed_pdf(monkeypatch): + monkeypatch.setattr(batches_mod.StorageAdmin, "upload_file", lambda *a, **k: None, raising=False) + c = make_client(_batch_store()) + r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"not really a pdf", "application/pdf")}, data={"matching_method": "manual"}) + assert r.status_code == 415 # magic-byte sniff + + +def test_scan_rejects_oversize(monkeypatch): + monkeypatch.setattr(batches_mod, "MAX_SCAN_BYTES", 8) + c = make_client(_batch_store()) + r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"%PDF-" + b"0" * 100, "application/pdf")}, data={"matching_method": "manual"}) + assert r.status_code == 413 + + +def test_scan_manual_match_happy(monkeypatch): + monkeypatch.setattr(batches_mod.StorageAdmin, "upload_file", lambda self, *a, **k: None, raising=False) + store = _batch_store() + c = make_client(store) + r = c.post("/api/exam/batches/b1/scans", + files={"file": ("x.pdf", b"%PDF-1.7 minimal", "application/pdf")}, + data={"matching_method": "manual", "student_id": "s1"}) + assert r.status_code == 200 + assert r.json()["status"] == "matched" + assert store["student_submissions"][0]["status"] == "matched" + assert store["student_submissions"][0]["scan_url"].startswith("exam-submissions/b1/") + + +def test_scan_denied_for_non_owner(monkeypatch): + monkeypatch.setattr(batches_mod.StorageAdmin, "upload_file", lambda self, *a, **k: None, raising=False) + store = _batch_store() + c = make_client(store, user_id="someone-else") + r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"%PDF-1.7", "application/pdf")}, data={"matching_method": "manual"}) + assert r.status_code == 403 From a1d297ac309b2b53428c1f522e754df31a6ee07a Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sat, 6 Jun 2026 18:41:23 +0000 Subject: [PATCH 4/8] test(exam): replace StorageAdmin with fake class in scan tests Co-Authored-By: Claude Opus 4.8 --- tests/test_exam_batches.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/test_exam_batches.py b/tests/test_exam_batches.py index 37d90a5..d7e9401 100644 --- a/tests/test_exam_batches.py +++ b/tests/test_exam_batches.py @@ -251,8 +251,7 @@ def test_scan_rejects_non_pdf_mime(): assert r.status_code == 415 -def test_scan_rejects_spoofed_pdf(monkeypatch): - monkeypatch.setattr(batches_mod.StorageAdmin, "upload_file", lambda *a, **k: None, raising=False) +def test_scan_rejects_spoofed_pdf(): c = make_client(_batch_store()) r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"not really a pdf", "application/pdf")}, data={"matching_method": "manual"}) assert r.status_code == 415 # magic-byte sniff @@ -265,8 +264,13 @@ def test_scan_rejects_oversize(monkeypatch): assert r.status_code == 413 +class _FakeStorage: + def upload_file(self, *a, **k): + return None + + def test_scan_manual_match_happy(monkeypatch): - monkeypatch.setattr(batches_mod.StorageAdmin, "upload_file", lambda self, *a, **k: None, raising=False) + monkeypatch.setattr(batches_mod, "StorageAdmin", _FakeStorage) store = _batch_store() c = make_client(store) r = c.post("/api/exam/batches/b1/scans", @@ -279,7 +283,7 @@ def test_scan_manual_match_happy(monkeypatch): def test_scan_denied_for_non_owner(monkeypatch): - monkeypatch.setattr(batches_mod.StorageAdmin, "upload_file", lambda self, *a, **k: None, raising=False) + monkeypatch.setattr(batches_mod, "StorageAdmin", _FakeStorage) store = _batch_store() c = make_client(store, user_id="someone-else") r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"%PDF-1.7", "application/pdf")}, data={"matching_method": "manual"}) From 62234dbbcb02121f77e1e967e7ee5bdd9423563a Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sat, 6 Jun 2026 18:43:09 +0000 Subject: [PATCH 5/8] fix(exam): blank total only for absent AND unmarked; flip status on mark A roster student starts 'absent' and a direct mark would otherwise still show a blank total. Now total is blank only when absent with no marks; recording a mark advances the submission out of absent/unmatched to 'marking'. Co-Authored-By: Claude Opus 4.8 --- routers/exam/batches.py | 18 +++++++++++++++--- tests/test_exam_batches.py | 11 +++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/routers/exam/batches.py b/routers/exam/batches.py index 45bba07..701d95f 100644 --- a/routers/exam/batches.py +++ b/routers/exam/batches.py @@ -187,8 +187,14 @@ def _assemble_results(ctx: ExamContext, batch: Dict[str, Any]) -> Dict[str, Any] results = [] for s in submissions: # every submission incl. absent → A7 sub_marks = by_sub.get(s["id"], {}) - is_absent = s.get("status") == "absent" - total = None if is_absent else sum(v or 0 for v in sub_marks.values()) + # Blank total ONLY for a genuine no-show (absent AND nothing marked). A student with any + # mark gets a real total regardless of status; a present-but-unmarked student totals 0. + if sub_marks: + total = sum(v or 0 for v in sub_marks.values()) + elif s.get("status") == "absent": + total = None + else: + total = 0 results.append({ "submission_id": s["id"], "student_id": s.get("student_id"), @@ -248,7 +254,7 @@ async def upsert_mark( # Derive batch_id from the submission (as-user read → also enforces the caller owns the batch # the submission belongs to). The client never supplies the RLS scoping key directly. submission = _first( - ctx.supabase.table("student_submissions").select("id, batch_id").eq("id", body.submission_id).limit(1).execute() + ctx.supabase.table("student_submissions").select("id, batch_id, status").eq("id", body.submission_id).limit(1).execute() ) if not submission: raise HTTPException(status_code=404, detail="Submission not found") @@ -273,6 +279,12 @@ async def upsert_mark( upserted = _first(ctx.supabase.table("mark_entries").upsert(row).execute()) if not upserted: raise HTTPException(status_code=500, detail="Failed to upsert mark") + + # A marked student is, by definition, not absent — advance the submission out of the + # no-submission states so results/queue reflect that marking has started. + if submission.get("status") in ("absent", "unmatched"): + ctx.supabase.table("student_submissions").update({"status": "marking"}).eq("id", body.submission_id).execute() + return upserted diff --git a/tests/test_exam_batches.py b/tests/test_exam_batches.py index d7e9401..299bb7a 100644 --- a/tests/test_exam_batches.py +++ b/tests/test_exam_batches.py @@ -233,6 +233,17 @@ def test_upsert_mark_derives_batch_and_roundtrips(): assert sum(1 for m in store["mark_entries"] if m["id"] == "mk-1") == 1 +def test_upsert_mark_flips_absent_submission_to_marking(): + store = _batch_with_cohort() # sub2 starts 'absent' + c = make_client(store) + c.put("/api/exam/marks/mk-2", json={"submission_id": "sub2", "question_id": "q1", "awarded_marks": 2}) + sub2 = next(s for s in store["student_submissions"] if s["id"] == "sub2") + assert sub2["status"] == "marking" + # results now show a real total for the (formerly absent) marked student + res = {r["submission_id"]: r for r in c.get("/api/exam/batches/b1/results").json()["results"]} + assert res["sub2"]["total"] == 2 + + def test_upsert_mark_submission_404(): c = make_client(_batch_with_cohort()) assert c.put("/api/exam/marks/mk-x", json={"submission_id": "nope", "question_id": "q1", "awarded_marks": 1}).status_code == 404 From 98be55ab575ab514e6db657a1269c64445ebb840 Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sat, 6 Jun 2026 18:46:50 +0000 Subject: [PATCH 6/8] fix(storage): de-double braces in storage.py (set-of-dict crash) Pre-existing bug (E7): the whole file had doubled braces, so every dict literal was a set containing a dict -> 'unhashable type: dict' at runtime, and log f-strings printed literal {braces}. This broke StorageManager.upload_file / list_bucket_contents / create_bucket / bucket-init for ALL callers (incl. files.py uploads), not just exam scans. Mechanical de-double; no logic change. Co-Authored-By: Claude Opus 4.8 --- modules/database/supabase/utils/storage.py | 98 +++++++++++----------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/modules/database/supabase/utils/storage.py b/modules/database/supabase/utils/storage.py index 9d0d1d7..a0c1c35 100644 --- a/modules/database/supabase/utils/storage.py +++ b/modules/database/supabase/utils/storage.py @@ -23,76 +23,76 @@ class StorageManager: def check_bucket_exists(self, bucket_id: str) -> bool: """Check if a storage bucket exists""" try: - self.logger.info(f"Checking if bucket {{bucket_id}} exists") + self.logger.info(f"Checking if bucket {bucket_id} exists") buckets = self.client.supabase.storage.list_buckets() return any(bucket.name == bucket_id for bucket in buckets) except Exception as e: - self.logger.error(f"Error checking bucket {{bucket_id}}: {{str(e)}}") + self.logger.error(f"Error checking bucket {bucket_id}: {str(e)}") return False def list_bucket_contents(self, bucket_id: str, path: str = "") -> Dict: """List contents of a bucket at specified path""" try: - self.logger.info(f"Listing contents of bucket {{bucket_id}} at path {{path}}") + self.logger.info(f"Listing contents of bucket {bucket_id} at path {path}") contents = self.client.supabase.storage.from_(bucket_id).list(path) - return {{ + return { "folders": [item for item in contents if item.get("id", "").endswith("/")], "files": [item for item in contents if not item.get("id", "").endswith("/")] - }} + } except Exception as e: - self.logger.error(f"Error listing bucket contents: {{str(e)}}") + self.logger.error(f"Error listing bucket contents: {str(e)}") raise StorageError(str(e)) def upload_file(self, bucket_id: str, file_path: str, file_data: bytes, content_type: str, upsert: bool = True) -> Any: """Upload a file to a storage bucket""" try: - self.logger.info(f"Uploading file to {{bucket_id}} at path {{file_path}}") + self.logger.info(f"Uploading file to {bucket_id} at path {file_path}") return self.client.supabase.storage.from_(bucket_id).upload( path=file_path, file=file_data, - file_options={{ + file_options={ "content-type": content_type, "x-upsert": "true" if upsert else "false" - }} + } ) except Exception as e: - self.logger.error(f"Error uploading file: {{str(e)}}") + self.logger.error(f"Error uploading file: {str(e)}") raise StorageError(str(e)) def download_file(self, bucket_id: str, file_path: str) -> bytes: """Download a file from a storage bucket""" try: - self.logger.info(f"Downloading file from {{bucket_id}} at path {{file_path}}") + self.logger.info(f"Downloading file from {bucket_id} at path {file_path}") return self.client.supabase.storage.from_(bucket_id).download(file_path) except Exception as e: - self.logger.error(f"Error downloading file: {{str(e)}}") + self.logger.error(f"Error downloading file: {str(e)}") raise StorageError(str(e)) def delete_file(self, bucket_id: str, file_path: str) -> None: """Delete a file from a storage bucket""" try: - self.logger.info(f"Deleting file from {{bucket_id}} at path {{file_path}}") + self.logger.info(f"Deleting file from {bucket_id} at path {file_path}") self.client.supabase.storage.from_(bucket_id).remove([file_path]) except Exception as e: - self.logger.error(f"Error deleting file: {{str(e)}}") + self.logger.error(f"Error deleting file: {str(e)}") raise StorageError(str(e)) def get_public_url(self, bucket_id: str, file_path: str) -> str: """Get public URL for a file""" try: - self.logger.info(f"Getting public URL for file in {{bucket_id}} at path {{file_path}}") + self.logger.info(f"Getting public URL for file in {bucket_id} at path {file_path}") return self.client.supabase.storage.from_(bucket_id).get_public_url(file_path) except Exception as e: - self.logger.error(f"Error getting public URL: {{str(e)}}") + self.logger.error(f"Error getting public URL: {str(e)}") raise StorageError(str(e)) def create_signed_url(self, bucket_id: str, file_path: str, expires_in: int = 3600) -> Any: """Create a signed URL for temporary file access""" try: - self.logger.info(f"Creating signed URL for file in {{bucket_id}} at path {{file_path}}") + self.logger.info(f"Creating signed URL for file in {bucket_id} at path {file_path}") return self.client.supabase.storage.from_(bucket_id).create_signed_url(file_path, expires_in) except Exception as e: - self.logger.error(f"Error creating signed URL: {{str(e)}}") + self.logger.error(f"Error creating signed URL: {str(e)}") raise StorageError(str(e)) class StorageAdmin(StorageManager): @@ -115,9 +115,9 @@ class StorageAdmin(StorageManager): ) -> Dict[str, Any]: """Create a new storage bucket with supported parameters.""" try: - self.logger.info(f"Creating bucket {{id}} with name {{name}}") + self.logger.info(f"Creating bucket {id} with name {name}") - options: Optional[CreateBucketOptions] = {{}} + options: Optional[CreateBucketOptions] = {} if public: options["public"] = public if file_size_limit is not None: @@ -133,7 +133,7 @@ class StorageAdmin(StorageManager): return bucket except Exception as e: - self.logger.error(f"Error creating bucket {{id}}: {{str(e)}}") + self.logger.error(f"Error creating bucket {id}: {str(e)}") raise StorageError(str(e)) def initialize_core_buckets(self, admin_user_id: Optional[str] = None) -> List[Dict[str, Any]]: @@ -144,7 +144,7 @@ class StorageAdmin(StorageManager): raise ValueError("Admin user ID is required for bucket initialization") core_buckets = [ - {{ + { "id": "cc.users", "name": "CC Users", "public": False, @@ -156,8 +156,8 @@ class StorageAdmin(StorageManager): 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] - }}, - {{ + }, + { "id": "cc.institutes", "name": "CC Institutes", "public": False, @@ -169,7 +169,7 @@ class StorageAdmin(StorageManager): 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] - }} + } ] results = [] @@ -177,30 +177,30 @@ class StorageAdmin(StorageManager): try: bucket_name = bucket.pop("name") result = self.create_bucket(name=bucket_name, **bucket) - results.append({{ + results.append({ "bucket": bucket["id"], "status": "success", "result": result - }}) + }) except Exception as e: - self.logger.error(f"Error creating bucket {{bucket['id']}}: {{str(e)}}") - results.append({{ + self.logger.error(f"Error creating bucket {bucket['id']}: {str(e)}") + results.append({ "bucket": bucket["id"], "status": "error", "error": str(e) - }}) + }) return results except Exception as e: - self.logger.error(f"Error initializing core buckets: {{str(e)}}") + self.logger.error(f"Error initializing core buckets: {str(e)}") raise StorageError(str(e)) def create_user_bucket(self, user_id: str, username: str) -> Dict[str, Any]: """Create a storage bucket for a specific user.""" try: - bucket_id = f"cc.users.admin.{{username}}" - bucket_name = f"User Files - {{username}}" + bucket_id = f"cc.users.admin.{username}" + bucket_name = f"User Files - {username}" return self.create_bucket( id=bucket_id, @@ -217,7 +217,7 @@ class StorageAdmin(StorageManager): ) except Exception as e: - self.logger.error(f"Error creating user bucket for {{username}}: {{str(e)}}") + self.logger.error(f"Error creating user bucket for {username}: {str(e)}") raise StorageError(str(e)) def create_school_buckets(self, school_id: str, school_name: str, admin_user_id: Optional[str] = None) -> Dict[str, Any]: @@ -228,9 +228,9 @@ class StorageAdmin(StorageManager): raise ValueError("Admin user ID is required for school bucket creation") school_buckets = [ - {{ - "id": f"cc.institutes.{{school_id}}.public", - "name": f"{{school_name}} - Public Files", + { + "id": f"cc.institutes.{school_id}.public", + "name": f"{school_name} - Public Files", "public": True, "owner": owner_id, "owner_id": school_id, @@ -240,10 +240,10 @@ class StorageAdmin(StorageManager): 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] - }}, - {{ - "id": f"cc.institutes.{{school_id}}.private", - "name": f"{{school_name}} - Private Files", + }, + { + "id": f"cc.institutes.{school_id}.private", + "name": f"{school_name} - Private Files", "public": False, "owner": owner_id, "owner_id": school_id, @@ -253,29 +253,29 @@ class StorageAdmin(StorageManager): 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] - }} + } ] - results = {{}} + results = {} for bucket in school_buckets: try: bucket_name = bucket.pop("name") result = self.create_bucket(name=bucket_name, **bucket) - results[bucket["id"]] = {{ + results[bucket["id"]] = { "status": "success", "result": result - }} + } except Exception as e: - self.logger.error(f"Error creating school bucket {{bucket['id']}}: {{str(e)}}") - results[bucket["id"]] = {{ + self.logger.error(f"Error creating school bucket {bucket['id']}: {str(e)}") + results[bucket["id"]] = { "status": "error", "error": str(e) - }} + } return results except Exception as e: - self.logger.error(f"Error creating school buckets: {{str(e)}}") + self.logger.error(f"Error creating school buckets: {str(e)}") raise StorageError(str(e)) class StorageUser(StorageManager): From 77bb0766ffe9416ed4327ee1307c85b3c7eb61e9 Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sat, 6 Jun 2026 19:02:18 +0000 Subject: [PATCH 7/8] feat(exam): Neo4j projection on template save + neo4j-sync (S4-7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit modules/database/services/exam_projection.py projects a saved template into cc.public.exams: ExamPaper -> Question/Part -> Region + Part-[:ASSESSES]-> SpecPoint, joined by shared UUIDs (exam_questions.id, exam_response_areas.id, exam_code, spec_code). Full re-sync per exam_code (idempotent). Reads via service role + writes via system Neo4j driver (R3.5.1 documented graph-writer). Wiring (R3.5.4/R5.3): - PUT /templates/{id} enqueues project_template_safe via BackgroundTasks (swallows failures so a graph hiccup never fails the canvas save). - POST /templates/{id}/neo4j-sync — manual trigger, as-user auth + owner check, runs synchronously and returns projection counts. Unit tests: projection scheduled on PUT; neo4j-sync owner/403/404. Co-Authored-By: Claude Opus 4.8 --- modules/database/services/exam_projection.py | 163 +++++++++++++++++++ routers/exam/templates.py | 26 ++- tests/test_exam_templates.py | 39 +++++ 3 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 modules/database/services/exam_projection.py diff --git a/modules/database/services/exam_projection.py b/modules/database/services/exam_projection.py new file mode 100644 index 0000000..2f5d969 --- /dev/null +++ b/modules/database/services/exam_projection.py @@ -0,0 +1,163 @@ +"""Project a saved exam template into the cc.public.exams Neo4j graph (card S4-7). + +Supabase is source of truth for the operational template (geometry, marks); cc.public.exams is +the co-primary knowledge graph (spec §2/S2). On template save the structural skeleton — +ExamPaper → Question/Part → Region, plus Part-[:ASSESSES]->SpecPoint — is projected here. + +Ownership model (R3.5.1): the graph is written by the API SERVICE ROLE only (no client writes), +so this task reads the template via service role and writes Neo4j with the system driver. It is +the sanctioned service-role path (documented in the ADR), distinct from the as-user request path. + +Join keys (never regenerated): + exam_questions.id -> Question.uuid_string (container) | Part.uuid_string (leaf) + exam_response_areas.id -> Region.uuid_string + eb_exams.exam_code -> ExamPaper.exam_code + eb_specifications.spec_code -> Specification.spec_code + +Projection is a full re-sync per exam_code (delete this paper's Question/Part/Region, recreate), +matching the PUT full-replace semantics — idempotent and safe to re-run. +""" +import os +import uuid +from typing import Any, Dict, List, Optional + +from modules.database.supabase.utils.client import SupabaseServiceRoleClient +from modules.database.tools.neo4j_driver_tools import get_session +from modules.logger_tool import initialise_logger + +logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) + +# MUST match run/initialization/init_exam_graph.py (shared DB name + deterministic uuid namespace). +EXAM_DB = "cc.public.exams" +NS = uuid.UUID("00000000-0000-0000-0000-00000000e8a1") + + +def _uid(*parts: str) -> str: + return str(uuid.uuid5(NS, ":".join(parts))) + + +def _rows(result: Any) -> List[Dict[str, Any]]: + data = getattr(result, "data", None) + if not data: + return [] + return data if isinstance(data, list) else [data] + + +def project_template(template_id: str) -> Dict[str, Any]: + """Read the template (service role) and (re)project its structure into cc.public.exams. + + Returns a counts dict. Raises on hard failure (caller decides whether to swallow — a + BackgroundTask logs and drops; the manual /neo4j-sync endpoint surfaces the error). + """ + sb = SupabaseServiceRoleClient().supabase + template = (sb.table("exam_templates").select("*").eq("id", template_id).limit(1).execute().data or [None])[0] + if not template: + raise ValueError(f"template {template_id} not found") + + questions = _rows(sb.table("exam_questions").select("*").eq("template_id", template_id).order("order").execute()) + regions = _rows(sb.table("exam_response_areas").select("*").eq("template_id", template_id).execute()) + + # Resolve the paper's exam_code + spec metadata. Catalogue paper → from eb_exams; ad-hoc upload + # (no exam_code) → a stable synthetic code so the paper still has a unique graph key. + exam_code = template.get("exam_code") + spec_code = None + paper_meta: Dict[str, Any] = {} + if template.get("exam_id"): + eb = (sb.table("eb_exams").select("exam_code, spec_code, paper_code, tier, session") + .eq("id", template["exam_id"]).limit(1).execute().data or [None])[0] + if eb: + exam_code = exam_code or eb.get("exam_code") + spec_code = eb.get("spec_code") + paper_meta = eb + if not exam_code: + exam_code = f"tpl:{template_id}" + + paper_uid = _uid("ExamPaper", exam_code) + counts = {"exam_code": exam_code, "questions": 0, "parts": 0, "regions": 0, "assesses": 0, "spec_linked": False} + + with get_session(database=EXAM_DB) as s: + # 1. Clean this paper's existing children (full re-sync), keep the ExamPaper node itself. + s.run("MATCH (r:Region {exam_code:$ec}) DETACH DELETE r", ec=exam_code).consume() + s.run("MATCH (n {exam_code:$ec}) WHERE n:Question OR n:Part DETACH DELETE n", ec=exam_code).consume() + + # 2. ExamPaper node. + s.run( + "MERGE (p:ExamPaper {uuid_string:$uid}) " + "SET p.exam_code=$ec, p.spec_code=$sc, p.title=$title, p.page_count=$pc, " + " p.paper_code=$paper_code, p.tier=$tier, p.session=$session, p.node_storage_path=$nsp", + uid=paper_uid, ec=exam_code, sc=spec_code, title=template.get("title"), + pc=template.get("page_count"), paper_code=paper_meta.get("paper_code"), + tier=paper_meta.get("tier"), session=paper_meta.get("session"), + nsp=f"{EXAM_DB}/ExamPaper/{exam_code}", + ).consume() + + # 3. Link to its Specification (seeded separately) when known. + if spec_code: + r = s.run( + "MATCH (sp:Specification {spec_code:$sc}), (p:ExamPaper {exam_code:$ec}) " + "MERGE (sp)-[:HAS_PAPER]->(p) RETURN count(*) AS n", + sc=spec_code, ec=exam_code, + ).single() + counts["spec_linked"] = bool(r and r["n"]) + + # 4. Question/Part nodes — pass 1: create all nodes (so parents exist before linking). + for q in questions: + label = "Question" if q.get("is_container") else "Part" + s.run( + f"MERGE (n:{label} {{uuid_string:$uid}}) " + "SET n.exam_code=$ec, n.label=$label, n.order=$order, n.max_marks=$mm, " + " n.answer_type=$at, n.mark_scheme_type=$mst, n.spec_ref=$sref, " + " n.node_storage_path=$nsp", + uid=q["id"], ec=exam_code, label=q.get("label"), order=q.get("order") or 0, + mm=q.get("max_marks") or 0, at=q.get("answer_type"), + mst=(q.get("mark_scheme") or {}).get("type") if isinstance(q.get("mark_scheme"), dict) else None, + sref=q.get("spec_ref"), nsp=f"{EXAM_DB}/{label}/{q['id']}", + ).consume() + counts["parts" if label == "Part" else "questions"] += 1 + + # 5. Structural + ASSESSES edges — pass 2. + for q in questions: + if q.get("parent_id"): + s.run( + "MATCH (parent {uuid_string:$pid}), (n {uuid_string:$uid}) MERGE (parent)-[:HAS_PART]->(n)", + pid=q["parent_id"], uid=q["id"], + ).consume() + else: + s.run( + "MATCH (p:ExamPaper {exam_code:$ec}), (n {uuid_string:$uid}) MERGE (p)-[:HAS_QUESTION]->(n)", + ec=exam_code, uid=q["id"], + ).consume() + if q.get("spec_ref"): + # SpecPoints are seeded per spec_code; match within this paper's spec when known. + r = s.run( + "MATCH (n {uuid_string:$uid}), (sp:SpecPoint {ref:$ref}) " + + ("WHERE sp.spec_code=$sc " if spec_code else "") + + "MERGE (n)-[:ASSESSES]->(sp) RETURN count(*) AS n", + uid=q["id"], ref=q["spec_ref"], sc=spec_code, + ).single() + counts["assesses"] += (r["n"] if r else 0) + + # 6. Region nodes + HAS_REGION edges. + for rg in regions: + s.run( + "MERGE (r:Region {uuid_string:$uid}) " + "SET r.exam_code=$ec, r.page=$page, r.kind=$kind, r.response_form=$rf, r.node_storage_path=$nsp", + uid=rg["id"], ec=exam_code, page=rg.get("page"), kind=rg.get("kind"), + rf=rg.get("response_form"), nsp=f"{EXAM_DB}/Region/{rg['id']}", + ).consume() + s.run( + "MATCH (q {uuid_string:$qid}), (r:Region {uuid_string:$uid}) MERGE (q)-[:HAS_REGION]->(r)", + qid=rg["question_id"], uid=rg["id"], + ).consume() + counts["regions"] += 1 + + logger.info(f"Projected template {template_id} → cc.public.exams: {counts}") + return counts + + +def project_template_safe(template_id: str) -> None: + """BackgroundTask wrapper: never raises (a failed projection must not break the HTTP save).""" + try: + project_template(template_id) + except Exception as exc: + logger.error(f"Background Neo4j projection failed for template {template_id}: {exc}") diff --git a/routers/exam/templates.py b/routers/exam/templates.py index 6c58657..d4ebfee 100644 --- a/routers/exam/templates.py +++ b/routers/exam/templates.py @@ -15,8 +15,9 @@ from __future__ import annotations import os from typing import Any, Dict, List, Optional -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException +from modules.database.services.exam_projection import project_template, project_template_safe from modules.logger_tool import initialise_logger from routers.exam.dependencies import ExamContext, get_exam_context, lookup_exam_code from routers.exam.schemas import ( @@ -137,6 +138,7 @@ async def get_template( async def replace_template( template_id: str, body: TemplateReplaceRequest, + background_tasks: BackgroundTasks, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: """Full-replace canvas save (R5.2). Replaces questions/response_areas/boundaries wholesale. @@ -223,6 +225,10 @@ async def replace_template( f"Exam template {template_id} replaced: {len(body.questions)} questions, " f"{len(body.response_areas)} regions, {len(body.boundaries)} boundaries" ) + # R3.5.4: a successful save enqueues a graph projection into cc.public.exams. BackgroundTasks + # is acceptable for Sprint 4 (durability via a real queue is a later step); failures are + # swallowed so the canvas save itself never fails on a graph hiccup. + background_tasks.add_task(project_template_safe, template_id) return await get_template(template_id, ctx) @@ -238,6 +244,24 @@ async def archive_template( return {"status": "archived", "id": template_id} +@router.post("/templates/{template_id}/neo4j-sync") +async def neo4j_sync( + template_id: str, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + """Manual graph-projection trigger (R5.3) for dev/backfill — runs synchronously and returns + counts. Auth/ownership is checked as-the-user; the projection itself uses service role + (R3.5.1, the documented graph-writer path).""" + template = _fetch_template_or_404(ctx, template_id) + _require_owner(ctx, template) + try: + counts = project_template(template_id) + except Exception as exc: + logger.error(f"Manual neo4j-sync failed for template {template_id}: {exc}") + raise HTTPException(status_code=502, detail=f"Projection failed: {exc}") + return {"status": "ok", "projection": counts} + + # ─── questions (granular edit path, R5.2) ──────────────────────────────────── @router.patch("/questions/{question_id}") diff --git a/tests/test_exam_templates.py b/tests/test_exam_templates.py index ea4fa95..96bfb60 100644 --- a/tests/test_exam_templates.py +++ b/tests/test_exam_templates.py @@ -5,9 +5,11 @@ ExamContext dependency is overridden with an in-memory fake, so these tests exer router's auth/ownership/institute logic without a live Supabase — the as-user RLS itself is verified separately against .94 (see the evidence note). """ +import pytest from fastapi import FastAPI from fastapi.testclient import TestClient +import routers.exam.templates as templates_mod from routers.exam.templates import router from routers.exam.dependencies import ExamContext, get_exam_context @@ -18,6 +20,15 @@ INST_A = "10000000-0000-0000-0000-000000000001" INST_B = "10000000-0000-0000-0000-000000000002" +@pytest.fixture(autouse=True) +def _stub_projection(monkeypatch): + """Record projection scheduling and never touch Neo4j/service-role in unit tests.""" + calls = [] + monkeypatch.setattr(templates_mod, "project_template_safe", lambda tid: calls.append(tid)) + monkeypatch.setattr(templates_mod, "project_template", lambda tid: {"exam_code": "X", "questions": 1}) + return calls + + # ─── in-memory fake supabase ───────────────────────────────────────────────── class FakeResult: @@ -262,3 +273,31 @@ def test_patch_question_empty_body_is_400(): store = {"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01"}]} client, _ = make_client(store=store) assert client.patch("/api/exam/questions/q1", json={}).status_code == 400 + + +# ─── Neo4j projection (S4-7) ───────────────────────────────────────────────── + +def test_put_schedules_projection(_stub_projection): + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]} + client, _ = make_client(store=store) + client.put("/api/exam/templates/t1", json={"questions": []}) + assert _stub_projection == ["t1"] # projection enqueued for the saved template + + +def test_neo4j_sync_owner_runs(): + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]} + client, _ = make_client(store=store) + r = client.post("/api/exam/templates/t1/neo4j-sync") + assert r.status_code == 200 + assert r.json()["projection"]["exam_code"] == "X" + + +def test_neo4j_sync_non_owner_403(): + store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]} + client, _ = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store) + assert client.post("/api/exam/templates/t1/neo4j-sync").status_code == 403 + + +def test_neo4j_sync_404(): + client, _ = make_client(store={"exam_templates": []}) + assert client.post("/api/exam/templates/does-not-exist/neo4j-sync").status_code == 404 From e269e67f279f4446e052138c70cf226e3978c389 Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sat, 6 Jun 2026 19:15:35 +0000 Subject: [PATCH 8/8] fix(exam): block destructive template PUT once marks recorded (review #1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PUT full-replace deletes exam_questions, and mark_entries.question_id cascades ON DELETE — so re-saving the setup canvas after marking began would silently wipe recorded marks. Guard: 409 if any mark_entry exists for the template's batches. Mark-scheme edits (PATCH /questions/{id}) are unaffected. Co-Authored-By: Claude Opus 4.8 --- routers/exam/templates.py | 26 ++++++++++++++++++++++++++ tests/test_exam_templates.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/routers/exam/templates.py b/routers/exam/templates.py index d4ebfee..3417d16 100644 --- a/routers/exam/templates.py +++ b/routers/exam/templates.py @@ -61,6 +61,20 @@ def _require_owner(ctx: ExamContext, template: Dict[str, Any]) -> None: raise HTTPException(status_code=403, detail="Only the template owner can modify it") +def _template_has_recorded_marks(ctx: ExamContext, template_id: str) -> bool: + """True if any mark_entry exists for a batch of this template (→ destructive PUT is unsafe).""" + batches = _rows( + ctx.supabase.table("marking_batches").select("id").eq("template_id", template_id).execute() + ) + batch_ids = [b["id"] for b in batches] + if not batch_ids: + return False + marks = _rows( + ctx.supabase.table("mark_entries").select("id").in_("batch_id", batch_ids).limit(1).execute() + ) + return bool(marks) + + # ─── templates ─────────────────────────────────────────────────────────────── @router.post("/templates") @@ -150,6 +164,18 @@ async def replace_template( template = _fetch_template_or_404(ctx, template_id) _require_owner(ctx, template) + # Data-loss guard: the wholesale question delete below cascades to mark_entries + # (mark_entries.question_id → exam_questions ON DELETE CASCADE). Refuse a structural + # full-replace once any marks have been recorded against this template's batches, so + # re-saving the setup canvas mid-marking can't silently wipe a teacher's marking work. + # (Mark-scheme tweaks use PATCH /questions/{id}, which is unaffected.) + if _template_has_recorded_marks(ctx, template_id): + raise HTTPException( + status_code=409, + detail="Template has recorded marks; structural full-replace is blocked. " + "Edit questions individually via PATCH /questions/{id}.", + ) + # Optional template-level metadata update alongside the canvas. if body.meta: updates = {k: v for k, v in body.meta.dict().items() if v is not None} diff --git a/tests/test_exam_templates.py b/tests/test_exam_templates.py index 96bfb60..2ef22ce 100644 --- a/tests/test_exam_templates.py +++ b/tests/test_exam_templates.py @@ -79,6 +79,12 @@ class FakeQuery: self.rows = [r for r in self.rows if r.get(key) != value] return self + def in_(self, key, values): + values = set(values) + self._filters.append(("in", key, values)) + self.rows = [r for r in self.rows if r.get(key) in values] + return self + def order(self, *_a, **_k): return self @@ -239,6 +245,31 @@ def test_put_replace_clears_previous_children(): assert ids == {"new"} # old row replaced, not appended +def test_put_replace_blocked_when_marks_recorded(): + # Re-saving the structure after marking began would cascade-delete mark_entries → guard 409. + store = { + "exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}], + "marking_batches": [{"id": "b1", "template_id": "t1", "teacher_id": TEACHER, "institute_id": INST_A}], + "mark_entries": [{"id": "m1", "batch_id": "b1", "submission_id": "s1", "question_id": "q1", "awarded_marks": 2}], + "exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "order": 0}], + } + client, store = make_client(store=store) + r = client.put("/api/exam/templates/t1", json={"questions": [{"id": "q2", "label": "new", "order": 0}]}) + assert r.status_code == 409 + # original question untouched (no destructive delete happened) + assert {q["id"] for q in store["exam_questions"]} == {"q1"} + + +def test_put_replace_allowed_when_batch_has_no_marks(): + store = { + "exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}], + "marking_batches": [{"id": "b1", "template_id": "t1", "teacher_id": TEACHER, "institute_id": INST_A}], + "mark_entries": [], + } + client, _ = make_client(store=store) + assert client.put("/api/exam/templates/t1", json={"questions": [{"id": "q2", "label": "new", "order": 0}]}).status_code == 200 + + def test_put_replace_denied_for_non_owner(): store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]} # Caller is a colleague in the same institute (can read), but not the owner → 403.