feat(exam): batches, scans, marks, results, CSV (S4-6)

Adds routers/exam/batches.py (mounted alongside templates under /api/exam):
- POST/GET /batches — batch creation seeds the cohort from class_students AS
  THE USER (cs_read requires caller teaches/admins the class); each active
  enrollee becomes a student_submissions row (status='absent') so no student
  is ever dropped from results (A7). Display names denormalised via a
  documented service-role profiles read (deny-all as-user, E4).
- GET /batches/{id}/queue — submissions + per-submission mark counts + progress.
- GET /batches/{id}/results + /csv — every roster student incl. absent (blank
  marks/total); CSV row always present (A7 baked into the contract).
- PUT /marks/{id} — upsert; batch_id derived server-side from the submission
  (client never supplies the RLS scoping key).
- POST /batches/{id}/scans — E3 guards: MIME check, hard size ceiling (chunked
  read), %PDF magic-byte sniff; owner-only; stores via service-role storage;
  manual/ordered matching (QR-decode is a follow-on, no QR fixtures yet).

Unit tests cover batch/roster-seed/list, queue, results+CSV A7, mark upsert
round-trip, and all scan guards + owner check.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
CC Worker 2026-06-06 18:40:10 +00:00
parent 96f9fb2446
commit 5ad9c01cde
5 changed files with 697 additions and 1 deletions

View File

@ -4,6 +4,13 @@ A clean top-level router group (R5.1/E5), deliberately NOT nested under /databas
endpoint authenticates the JWT and calls Supabase as-the-user so the RLS in endpoint authenticates the JWT and calls Supabase as-the-user so the RLS in
volumes/db/cc/72-exam-marker.sql is enforced (spec E1/E2 fixes). volumes/db/cc/72-exam-marker.sql is enforced (spec E1/E2 fixes).
""" """
from routers.exam.templates import router from fastapi import APIRouter
from routers.exam.templates import router as templates_router
from routers.exam.batches import router as batches_router
router = APIRouter()
router.include_router(templates_router)
router.include_router(batches_router)
__all__ = ["router"] __all__ = ["router"]

350
routers/exam/batches.py Normal file
View File

@ -0,0 +1,350 @@
"""Marking batches, scans, marks, results & CSV (/api/exam/batches..., /api/exam/marks/...) — S4-6.
As with templates, all user-facing access is as-the-user (RLS-enforced; E1). A batch is owned by
the teacher who creates it (R2.4); colleagues in the same institute can read it
(marking_batches_read), a teacher in another institute cannot ( 404, IDOR-safe).
Rostercohort (R4.3/A7): creating a batch from a class materialises one student_submissions row
per active enrollee (status='absent'), so every enrolled student is present in results/CSV from
the start and a no-show is never silently dropped. The roster ids are read AS THE USER from
class_students (cs_read requires the caller to teach/admin the class); only the display names are
resolved via service role (profiles is deny-all as-user, E4 see resolve_student_names).
Scans (R2.3/E3): the upload endpoint enforces a max size and validates that the bytes are a PDF
before storing. QR-decode + automatic student-matching is a follow-on (no QR'd fixtures exist
until the PrintGenerator card); v1 supports explicit (manual) and ordered matching.
"""
from __future__ import annotations
import csv
import io
import os
import uuid
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
from fastapi.responses import Response
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin
from modules.logger_tool import initialise_logger
from routers.exam.dependencies import ExamContext, get_exam_context, resolve_student_names
from routers.exam.schemas import CreateBatchRequest, MarkUpsertRequest
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
router = APIRouter()
# E3: bound the upload so a 36-page scan batch can't exhaust memory / be a DoS vector.
MAX_SCAN_BYTES = int(os.getenv("EXAM_SCAN_MAX_BYTES", str(50 * 1024 * 1024))) # 50 MB default
SCANS_BUCKET = os.getenv("EXAM_SCANS_BUCKET", "cc.users")
SCANS_PREFIX = "exam-submissions"
# ─── helpers ─────────────────────────────────────────────────────────────────
def _rows(result: Any) -> List[Dict[str, Any]]:
data = getattr(result, "data", None)
if not data:
return []
return data if isinstance(data, list) else [data]
def _first(result: Any) -> Optional[Dict[str, Any]]:
rows = _rows(result)
return rows[0] if rows else None
def _fetch_batch_or_404(ctx: ExamContext, batch_id: str) -> Dict[str, Any]:
row = _first(ctx.supabase.table("marking_batches").select("*").eq("id", batch_id).limit(1).execute())
if not row:
raise HTTPException(status_code=404, detail="Batch not found")
return row
def _require_owner(ctx: ExamContext, batch: Dict[str, Any]) -> None:
if batch.get("teacher_id") != ctx.user_id:
raise HTTPException(status_code=403, detail="Only the batch owner can modify it")
# ─── batches ─────────────────────────────────────────────────────────────────
@router.post("/batches")
async def create_batch(
body: CreateBatchRequest,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
# The batch inherits the template's institute; reading the template as-user also proves the
# caller may see it (RLS) — an unseeable template → 404.
template = _first(
ctx.supabase.table("exam_templates").select("id, institute_id").eq("id", body.template_id).limit(1).execute()
)
if not template:
raise HTTPException(status_code=404, detail="Template not found")
batch_row = {
"template_id": body.template_id,
"class_id": body.class_id,
"institute_id": template["institute_id"],
"teacher_id": ctx.user_id,
"title": body.title,
"status": "open",
}
batch_row = {k: v for k, v in batch_row.items() if v is not None}
batch = _first(ctx.supabase.table("marking_batches").insert(batch_row).execute())
if not batch:
raise HTTPException(status_code=500, detail="Failed to create batch")
batch_id = batch["id"]
seeded = 0
if body.class_id:
# Roster read is AS THE USER → cs_read requires the caller to teach/admin the class.
roster = _rows(
ctx.supabase.table("class_students")
.select("student_id")
.eq("class_id", body.class_id)
.eq("status", "active")
.execute()
)
student_ids = [r["student_id"] for r in roster if r.get("student_id")]
names = resolve_student_names(student_ids)
if student_ids:
sub_rows = [
{
"batch_id": batch_id,
"student_id": sid,
"student_name": names.get(sid),
"status": "absent", # A7: present in results until a scan is matched
}
for sid in student_ids
]
ctx.supabase.table("student_submissions").insert(sub_rows).execute()
seeded = len(sub_rows)
logger.info(f"Marking batch {batch_id} created by {ctx.user_id}; {seeded} roster submissions seeded")
return {**batch, "submission_count": seeded}
@router.get("/batches")
async def list_batches(
include_archived: bool = False,
template_id: Optional[str] = None,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
q = ctx.supabase.table("marking_batches").select("*")
if template_id:
q = q.eq("template_id", template_id)
if not include_archived:
q = q.neq("status", "archived")
return {"batches": _rows(q.order("created_at", desc=True).execute())}
@router.get("/batches/{batch_id}/queue")
async def batch_queue(
batch_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
batch = _fetch_batch_or_404(ctx, batch_id)
submissions = _rows(
ctx.supabase.table("student_submissions").select("*").eq("batch_id", batch_id).execute()
)
marks = _rows(ctx.supabase.table("mark_entries").select("submission_id").eq("batch_id", batch_id).execute())
marked_counts: Dict[str, int] = {}
for m in marks:
sid = m.get("submission_id")
marked_counts[sid] = marked_counts.get(sid, 0) + 1
enriched = [{**s, "mark_entry_count": marked_counts.get(s["id"], 0)} for s in submissions]
progress = {
"total": len(submissions),
"absent": sum(1 for s in submissions if s.get("status") == "absent"),
"complete": sum(1 for s in submissions if s.get("status") == "complete"),
"in_progress": sum(1 for s in submissions if s.get("status") in ("matched", "marking")),
}
return {"batch": batch, "submissions": enriched, "progress": progress}
# ─── results & CSV (A7) ──────────────────────────────────────────────────────
def _assemble_results(ctx: ExamContext, batch: Dict[str, Any]) -> Dict[str, Any]:
batch_id = batch["id"]
questions = _rows(
ctx.supabase.table("exam_questions")
.select("id, label, max_marks, order")
.eq("template_id", batch["template_id"])
.order("order")
.execute()
)
submissions = _rows(
ctx.supabase.table("student_submissions").select("*").eq("batch_id", batch_id).execute()
)
marks = _rows(ctx.supabase.table("mark_entries").select("*").eq("batch_id", batch_id).execute())
by_sub: Dict[str, Dict[str, float]] = {}
for m in marks:
by_sub.setdefault(m["submission_id"], {})[m["question_id"]] = m.get("awarded_marks")
results = []
for s in submissions: # every submission incl. absent → A7
sub_marks = by_sub.get(s["id"], {})
is_absent = s.get("status") == "absent"
total = None if is_absent else sum(v or 0 for v in sub_marks.values())
results.append({
"submission_id": s["id"],
"student_id": s.get("student_id"),
"student_name": s.get("student_name"),
"status": s.get("status"),
"marks": {qid: sub_marks.get(qid) for qid in (q["id"] for q in questions)},
"total": total,
})
return {"batch": batch, "questions": questions, "results": results}
@router.get("/batches/{batch_id}/results")
async def batch_results(
batch_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
batch = _fetch_batch_or_404(ctx, batch_id)
return _assemble_results(ctx, batch)
@router.get("/batches/{batch_id}/csv")
async def batch_csv(
batch_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Response:
batch = _fetch_batch_or_404(ctx, batch_id)
data = _assemble_results(ctx, batch)
questions = data["questions"]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(["student_name", "student_id", "status"] + [q["label"] for q in questions] + ["total"])
for r in data["results"]:
# Absent students: blank marks + blank total, but the row is ALWAYS present (A7).
cells = [
"" if r["marks"].get(q["id"]) is None else r["marks"].get(q["id"])
for q in questions
]
total = "" if r["total"] is None else r["total"]
writer.writerow([r.get("student_name") or "", r.get("student_id") or "", r.get("status")] + cells + [total])
return Response(
content=buf.getvalue(),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="batch-{batch_id}.csv"'},
)
# ─── marks ───────────────────────────────────────────────────────────────────
@router.put("/marks/{mark_id}")
async def upsert_mark(
mark_id: str,
body: MarkUpsertRequest,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
# Derive batch_id from the submission (as-user read → also enforces the caller owns the batch
# the submission belongs to). The client never supplies the RLS scoping key directly.
submission = _first(
ctx.supabase.table("student_submissions").select("id, batch_id").eq("id", body.submission_id).limit(1).execute()
)
if not submission:
raise HTTPException(status_code=404, detail="Submission not found")
row = {
"id": mark_id,
"submission_id": body.submission_id,
"question_id": body.question_id,
"batch_id": submission["batch_id"],
"awarded_marks": body.awarded_marks,
"marked_by": "teacher",
}
if body.mark_scheme_detail is not None:
row["mark_scheme_detail"] = body.mark_scheme_detail
if body.annotation_shape_ids is not None:
row["annotation_shape_ids"] = body.annotation_shape_ids
if body.comment is not None:
row["comment"] = body.comment
if body.confirmed is not None:
row["confirmed"] = body.confirmed
upserted = _first(ctx.supabase.table("mark_entries").upsert(row).execute())
if not upserted:
raise HTTPException(status_code=500, detail="Failed to upsert mark")
return upserted
# ─── scans (R2.3 / E3) ───────────────────────────────────────────────────────
@router.post("/batches/{batch_id}/scans")
async def upload_scan(
batch_id: str,
file: UploadFile = File(...),
student_id: Optional[str] = Form(default=None),
matching_method: str = Form(default="manual"),
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
batch = _fetch_batch_or_404(ctx, batch_id)
_require_owner(ctx, batch)
# E3: validate MIME (client-declared) before reading the body.
if (file.content_type or "").lower() not in ("application/pdf", "application/x-pdf"):
raise HTTPException(status_code=415, detail="Only application/pdf scans are accepted")
# E3: read with a hard size ceiling instead of buffering an unbounded upload.
chunks: List[bytes] = []
total = 0
while True:
chunk = await file.read(1024 * 1024)
if not chunk:
break
total += len(chunk)
if total > MAX_SCAN_BYTES:
raise HTTPException(status_code=413, detail=f"Scan exceeds max size ({MAX_SCAN_BYTES} bytes)")
chunks.append(chunk)
data = b"".join(chunks)
# E3: content-sniff — declared type can be spoofed; require the PDF magic header.
if not data.startswith(b"%PDF-"):
raise HTTPException(status_code=415, detail="Uploaded file is not a valid PDF")
# Store via service role (documented): no submissions-bucket storage RLS exists yet; the
# endpoint already authorised the caller as the batch owner above.
storage_path = f"{SCANS_PREFIX}/{batch_id}/{uuid.uuid4()}.pdf"
try:
StorageAdmin().upload_file(SCANS_BUCKET, storage_path, data, "application/pdf", upsert=True)
except Exception as exc:
logger.error(f"scan storage upload failed (batch={batch_id}): {exc}")
raise HTTPException(status_code=502, detail="Failed to store scan")
sb = ctx.supabase
submission: Optional[Dict[str, Any]] = None
if matching_method == "manual" and student_id:
submission = _first(
sb.table("student_submissions").select("*").eq("batch_id", batch_id).eq("student_id", student_id).limit(1).execute()
)
elif matching_method == "ordered":
# Assign to the next not-yet-submitted roster slot.
pending = _rows(
sb.table("student_submissions").select("*").eq("batch_id", batch_id).in_("status", ["absent", "unmatched"]).execute()
)
submission = pending[0] if pending else None
payload = {
"scan_url": storage_path,
"qr_code": None,
"matching_method": matching_method if (student_id or matching_method == "ordered") else "manual",
"page_count": None,
"status": "matched" if submission else "unmatched",
}
if submission:
updated = _first(sb.table("student_submissions").update(payload).eq("id", submission["id"]).execute())
return updated or submission
# No roster slot matched → create an unmatched submission to be reconciled later.
new_row = {"batch_id": batch_id, **payload}
created = _first(sb.table("student_submissions").insert(new_row).execute())
if not created:
raise HTTPException(status_code=500, detail="Failed to record scan submission")
return created

View File

@ -100,6 +100,34 @@ async def get_exam_context(payload: Dict[str, Any] = Depends(auth)) -> ExamConte
return ExamContext(user_id, access_token, supabase, institute_ids) return ExamContext(user_id, access_token, supabase, institute_ids)
def resolve_student_names(student_ids: List[str]) -> Dict[str, str]:
"""Map profile id → display name for roster students (batch-creation denormalisation).
Documented service-role exception (S1, mirrors lookup_exam_code): `profiles` has no as-user
SELECT policy (E4), so the roster's display names can't be read as-the-user. The caller's
right to the roster itself is already enforced as-user (class_students.cs_read requires the
caller to teach/admin the class); this only resolves names for ids already authorised, and
the result is denormalised onto student_submissions so later reads need no profiles access.
"""
if not student_ids:
return {}
try:
sb = SupabaseServiceRoleClient().supabase
res = (
sb.table("profiles")
.select("id, full_name, display_name, email")
.in_("id", list(student_ids))
.execute()
)
out: Dict[str, str] = {}
for p in getattr(res, "data", None) or []:
out[p["id"]] = p.get("full_name") or p.get("display_name") or p.get("email") or ""
return out
except Exception as exc:
logger.warning(f"student name resolution failed: {exc}")
return {}
def lookup_exam_code(exam_id: str) -> Optional[str]: def lookup_exam_code(exam_id: str) -> Optional[str]:
"""Resolve eb_exams.exam_code for a catalogue paper (denormalised onto the template). """Resolve eb_exams.exam_code for a catalogue paper (denormalised onto the template).

View File

@ -102,3 +102,28 @@ class PatchQuestionRequest(BaseModel):
mark_scheme: Optional[Dict[str, Any]] = None mark_scheme: Optional[Dict[str, Any]] = None
is_container: Optional[bool] = None is_container: Optional[bool] = None
spec_ref: Optional[str] = None spec_ref: Optional[str] = None
# ─── Marking batches & marks ─────────────────────────────────────────────────
class CreateBatchRequest(BaseModel):
template_id: str
# When a class is given, the roster (class_students, status='active') is materialised as
# student_submissions(status='absent') so every enrolled student appears in results (A7).
class_id: Optional[str] = None
title: Optional[str] = None
class MarkUpsertRequest(BaseModel):
"""Upsert one mark entry (PUT /marks/{id}; id is the mark_entry uuid).
batch_id is derived server-side from the submission, so the client never sets the RLS
scoping key. submission_id + question_id identify what is being marked.
"""
submission_id: str
question_id: str
awarded_marks: float = 0
mark_scheme_detail: Optional[Dict[str, Any]] = None
annotation_shape_ids: Optional[Any] = None
comment: Optional[str] = None
confirmed: Optional[bool] = None

286
tests/test_exam_batches.py Normal file
View File

@ -0,0 +1,286 @@
"""Tests for /api/exam/batches, /marks, /scans (card S4-6).
FakeSupabase emulates RLS by pre-filtering the visible store slice (same approach as
test_exam_templates). Service-role helpers (name resolution, storage) are monkeypatched; live
as-user RLS is covered by the .94 smoke.
"""
import io
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
import routers.exam.batches as batches_mod
from routers.exam.batches import router
from routers.exam.dependencies import ExamContext
TEACHER = "00000000-0000-0000-0000-000000000001"
INST_A = "10000000-0000-0000-0000-000000000001"
TPL = "t-1"
CLASS = "c-1"
class FakeResult:
def __init__(self, data):
self.data = data
class FakeQuery:
def __init__(self, store, table):
self.store = store
self.table = table
self.rows = list(store.get(table, []))
self._filters = []
self._op = None
self._payload = None
self._limit = None
def select(self, *_a, **_k):
self._op = "select"; return self
def insert(self, payload):
self._op = "insert"; self._payload = payload; return self
def update(self, payload):
self._op = "update"; self._payload = payload; return self
def upsert(self, payload):
self._op = "upsert"; self._payload = payload; return self
def delete(self):
self._op = "delete"; return self
def eq(self, k, v):
self._filters.append(("eq", k, v)); self.rows = [r for r in self.rows if r.get(k) == v]; return self
def neq(self, k, v):
self._filters.append(("neq", k, v)); self.rows = [r for r in self.rows if r.get(k) != v]; return self
def in_(self, k, vals):
vals = set(vals); self._filters.append(("in", k, vals)); self.rows = [r for r in self.rows if r.get(k) in vals]; return self
def order(self, *_a, **_k):
return self
def limit(self, n):
self._limit = n; return self
def _match(self, row):
for op, k, v in self._filters:
if op == "eq" and row.get(k) != v:
return False
if op == "neq" and row.get(k) == v:
return False
if op == "in" and row.get(k) not in v:
return False
return True
def execute(self):
backing = self.store.setdefault(self.table, [])
if self._op in ("insert", "upsert"):
payloads = self._payload if isinstance(self._payload, list) else [self._payload]
out = []
for p in payloads:
row = dict(p)
if self._op == "upsert" and row.get("id") is not None:
existing = next((r for r in backing if r.get("id") == row["id"]), None)
if existing:
existing.update(row); out.append(existing); continue
row.setdefault("id", f"gen-{self.table}-{len(backing)}")
backing.append(row); out.append(row)
return FakeResult(out)
if self._op == "update":
out = []
for r in backing:
if self._match(r):
r.update(self._payload); out.append(r)
return FakeResult(out)
if self._op == "delete":
self.store[self.table] = [r for r in backing if not self._match(r)]
return FakeResult([r for r in backing if self._match(r)])
rows = self.rows[: self._limit] if self._limit is not None else self.rows
return FakeResult(rows)
class FakeSupabase:
def __init__(self, store):
self.store = store
def table(self, name):
return FakeQuery(self.store, name)
def make_client(store, user_id=TEACHER, institute_ids=(INST_A,)):
app = FastAPI()
app.include_router(router, prefix="/api/exam")
from routers.exam.dependencies import get_exam_context
app.dependency_overrides[get_exam_context] = lambda: ExamContext(user_id, "tok", FakeSupabase(store), list(institute_ids))
return TestClient(app)
def base_store(**extra):
store = {"exam_templates": [{"id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "draft"}]}
store.update(extra)
return store
# ─── batches ───────────────────────────────────────────────────────────────
def test_create_batch_no_class():
store = base_store()
c = make_client(store)
r = c.post("/api/exam/batches", json={"template_id": TPL, "title": "Mock 1"})
assert r.status_code == 200
b = r.json()
assert b["teacher_id"] == TEACHER and b["institute_id"] == INST_A
assert b["status"] == "open" and b["submission_count"] == 0
def test_create_batch_template_404():
c = make_client(base_store())
assert c.post("/api/exam/batches", json={"template_id": "nope"}).status_code == 404
def test_create_batch_seeds_roster_as_absent(monkeypatch):
monkeypatch.setattr(batches_mod, "resolve_student_names",
lambda ids: {sid: f"Name {sid}" for sid in ids})
store = base_store(class_students=[
{"class_id": CLASS, "student_id": "s1", "status": "active"},
{"class_id": CLASS, "student_id": "s2", "status": "active"},
{"class_id": CLASS, "student_id": "s3", "status": "inactive"}, # excluded
])
c = make_client(store)
r = c.post("/api/exam/batches", json={"template_id": TPL, "class_id": CLASS})
assert r.status_code == 200
assert r.json()["submission_count"] == 2
subs = store["student_submissions"]
assert {s["student_id"] for s in subs} == {"s1", "s2"}
assert all(s["status"] == "absent" for s in subs)
assert all(s["student_name"].startswith("Name ") for s in subs)
def test_list_batches_excludes_archived():
store = base_store(marking_batches=[
{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"},
{"id": "b2", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "archived"},
])
c = make_client(store)
ids = {b["id"] for b in c.get("/api/exam/batches").json()["batches"]}
assert ids == {"b1"}
# ─── queue / results / csv (A7) ──────────────────────────────────────────────
def _batch_with_cohort():
return base_store(
marking_batches=[{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"}],
exam_questions=[
{"id": "q1", "template_id": TPL, "label": "01", "max_marks": 3, "order": 0},
{"id": "q2", "template_id": TPL, "label": "02", "max_marks": 5, "order": 1},
],
student_submissions=[
{"id": "sub1", "batch_id": "b1", "student_id": "s1", "student_name": "Alice", "status": "complete"},
{"id": "sub2", "batch_id": "b1", "student_id": "s2", "student_name": "Bob", "status": "absent"},
],
mark_entries=[
{"id": "m1", "batch_id": "b1", "submission_id": "sub1", "question_id": "q1", "awarded_marks": 2},
{"id": "m2", "batch_id": "b1", "submission_id": "sub1", "question_id": "q2", "awarded_marks": 4},
],
)
def test_queue_progress_counts():
c = make_client(_batch_with_cohort())
body = c.get("/api/exam/batches/b1/queue").json()
assert body["progress"]["total"] == 2
assert body["progress"]["absent"] == 1 and body["progress"]["complete"] == 1
counts = {s["id"]: s["mark_entry_count"] for s in body["submissions"]}
assert counts == {"sub1": 2, "sub2": 0}
def test_results_includes_absent_with_blank(monkeypatch):
c = make_client(_batch_with_cohort())
body = c.get("/api/exam/batches/b1/results").json()
by_id = {r["submission_id"]: r for r in body["results"]}
assert by_id["sub1"]["total"] == 6
assert by_id["sub2"]["total"] is None # absent → blank total (A7)
assert set(by_id["sub2"]["marks"].values()) == {None}
assert {r["student_name"] for r in body["results"]} == {"Alice", "Bob"} # absent NOT dropped
def test_csv_includes_absent_row():
c = make_client(_batch_with_cohort())
text = c.get("/api/exam/batches/b1/csv").text
lines = [l for l in text.strip().splitlines() if l]
assert lines[0].split(",")[:3] == ["student_name", "student_id", "status"]
assert "01" in lines[0] and "02" in lines[0] # question labels as columns
assert any(l.startswith("Bob,") and ",absent," in l for l in lines) # absent present
assert len(lines) == 3 # header + 2 students (incl. absent)
# ─── marks ───────────────────────────────────────────────────────────────────
def test_upsert_mark_derives_batch_and_roundtrips():
store = _batch_with_cohort()
c = make_client(store)
r = c.put("/api/exam/marks/mk-1", json={"submission_id": "sub1", "question_id": "q1", "awarded_marks": 3})
assert r.status_code == 200
row = r.json()
assert row["batch_id"] == "b1" and row["awarded_marks"] == 3 and row["id"] == "mk-1"
# upsert again → same id updated, not duplicated
c.put("/api/exam/marks/mk-1", json={"submission_id": "sub1", "question_id": "q1", "awarded_marks": 1})
assert sum(1 for m in store["mark_entries"] if m["id"] == "mk-1") == 1
def test_upsert_mark_submission_404():
c = make_client(_batch_with_cohort())
assert c.put("/api/exam/marks/mk-x", json={"submission_id": "nope", "question_id": "q1", "awarded_marks": 1}).status_code == 404
# ─── scans (E3 guards) ───────────────────────────────────────────────────────
def _batch_store():
return base_store(marking_batches=[{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"}],
student_submissions=[{"id": "sub1", "batch_id": "b1", "student_id": "s1", "status": "absent"}])
def test_scan_rejects_non_pdf_mime():
c = make_client(_batch_store())
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.png", b"\x89PNG", "image/png")}, data={"matching_method": "manual"})
assert r.status_code == 415
def test_scan_rejects_spoofed_pdf(monkeypatch):
monkeypatch.setattr(batches_mod.StorageAdmin, "upload_file", lambda *a, **k: None, raising=False)
c = make_client(_batch_store())
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"not really a pdf", "application/pdf")}, data={"matching_method": "manual"})
assert r.status_code == 415 # magic-byte sniff
def test_scan_rejects_oversize(monkeypatch):
monkeypatch.setattr(batches_mod, "MAX_SCAN_BYTES", 8)
c = make_client(_batch_store())
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"%PDF-" + b"0" * 100, "application/pdf")}, data={"matching_method": "manual"})
assert r.status_code == 413
def test_scan_manual_match_happy(monkeypatch):
monkeypatch.setattr(batches_mod.StorageAdmin, "upload_file", lambda self, *a, **k: None, raising=False)
store = _batch_store()
c = make_client(store)
r = c.post("/api/exam/batches/b1/scans",
files={"file": ("x.pdf", b"%PDF-1.7 minimal", "application/pdf")},
data={"matching_method": "manual", "student_id": "s1"})
assert r.status_code == 200
assert r.json()["status"] == "matched"
assert store["student_submissions"][0]["status"] == "matched"
assert store["student_submissions"][0]["scan_url"].startswith("exam-submissions/b1/")
def test_scan_denied_for_non_owner(monkeypatch):
monkeypatch.setattr(batches_mod.StorageAdmin, "upload_file", lambda self, *a, **k: None, raising=False)
store = _batch_store()
c = make_client(store, user_id="someone-else")
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"%PDF-1.7", "application/pdf")}, data={"matching_method": "manual"})
assert r.status_code == 403