api/tests/test_exam_batches.py
CC Worker 62234dbbcb fix(exam): blank total only for absent AND unmarked; flip status on mark
A roster student starts 'absent' and a direct mark would otherwise still show a
blank total. Now total is blank only when absent with no marks; recording a mark
advances the submission out of absent/unmatched to 'marking'.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 18:43:09 +00:00

302 lines
12 KiB
Python

"""Tests for /api/exam/batches, /marks, /scans (card S4-6).
FakeSupabase emulates RLS by pre-filtering the visible store slice (same approach as
test_exam_templates). Service-role helpers (name resolution, storage) are monkeypatched; live
as-user RLS is covered by the .94 smoke.
"""
import io
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
import routers.exam.batches as batches_mod
from routers.exam.batches import router
from routers.exam.dependencies import ExamContext
TEACHER = "00000000-0000-0000-0000-000000000001"
INST_A = "10000000-0000-0000-0000-000000000001"
TPL = "t-1"
CLASS = "c-1"
class FakeResult:
def __init__(self, data):
self.data = data
class FakeQuery:
def __init__(self, store, table):
self.store = store
self.table = table
self.rows = list(store.get(table, []))
self._filters = []
self._op = None
self._payload = None
self._limit = None
def select(self, *_a, **_k):
self._op = "select"; return self
def insert(self, payload):
self._op = "insert"; self._payload = payload; return self
def update(self, payload):
self._op = "update"; self._payload = payload; return self
def upsert(self, payload):
self._op = "upsert"; self._payload = payload; return self
def delete(self):
self._op = "delete"; return self
def eq(self, k, v):
self._filters.append(("eq", k, v)); self.rows = [r for r in self.rows if r.get(k) == v]; return self
def neq(self, k, v):
self._filters.append(("neq", k, v)); self.rows = [r for r in self.rows if r.get(k) != v]; return self
def in_(self, k, vals):
vals = set(vals); self._filters.append(("in", k, vals)); self.rows = [r for r in self.rows if r.get(k) in vals]; return self
def order(self, *_a, **_k):
return self
def limit(self, n):
self._limit = n; return self
def _match(self, row):
for op, k, v in self._filters:
if op == "eq" and row.get(k) != v:
return False
if op == "neq" and row.get(k) == v:
return False
if op == "in" and row.get(k) not in v:
return False
return True
def execute(self):
backing = self.store.setdefault(self.table, [])
if self._op in ("insert", "upsert"):
payloads = self._payload if isinstance(self._payload, list) else [self._payload]
out = []
for p in payloads:
row = dict(p)
if self._op == "upsert" and row.get("id") is not None:
existing = next((r for r in backing if r.get("id") == row["id"]), None)
if existing:
existing.update(row); out.append(existing); continue
row.setdefault("id", f"gen-{self.table}-{len(backing)}")
backing.append(row); out.append(row)
return FakeResult(out)
if self._op == "update":
out = []
for r in backing:
if self._match(r):
r.update(self._payload); out.append(r)
return FakeResult(out)
if self._op == "delete":
self.store[self.table] = [r for r in backing if not self._match(r)]
return FakeResult([r for r in backing if self._match(r)])
rows = self.rows[: self._limit] if self._limit is not None else self.rows
return FakeResult(rows)
class FakeSupabase:
def __init__(self, store):
self.store = store
def table(self, name):
return FakeQuery(self.store, name)
def make_client(store, user_id=TEACHER, institute_ids=(INST_A,)):
app = FastAPI()
app.include_router(router, prefix="/api/exam")
from routers.exam.dependencies import get_exam_context
app.dependency_overrides[get_exam_context] = lambda: ExamContext(user_id, "tok", FakeSupabase(store), list(institute_ids))
return TestClient(app)
def base_store(**extra):
store = {"exam_templates": [{"id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "draft"}]}
store.update(extra)
return store
# ─── batches ───────────────────────────────────────────────────────────────
def test_create_batch_no_class():
store = base_store()
c = make_client(store)
r = c.post("/api/exam/batches", json={"template_id": TPL, "title": "Mock 1"})
assert r.status_code == 200
b = r.json()
assert b["teacher_id"] == TEACHER and b["institute_id"] == INST_A
assert b["status"] == "open" and b["submission_count"] == 0
def test_create_batch_template_404():
c = make_client(base_store())
assert c.post("/api/exam/batches", json={"template_id": "nope"}).status_code == 404
def test_create_batch_seeds_roster_as_absent(monkeypatch):
monkeypatch.setattr(batches_mod, "resolve_student_names",
lambda ids: {sid: f"Name {sid}" for sid in ids})
store = base_store(class_students=[
{"class_id": CLASS, "student_id": "s1", "status": "active"},
{"class_id": CLASS, "student_id": "s2", "status": "active"},
{"class_id": CLASS, "student_id": "s3", "status": "inactive"}, # excluded
])
c = make_client(store)
r = c.post("/api/exam/batches", json={"template_id": TPL, "class_id": CLASS})
assert r.status_code == 200
assert r.json()["submission_count"] == 2
subs = store["student_submissions"]
assert {s["student_id"] for s in subs} == {"s1", "s2"}
assert all(s["status"] == "absent" for s in subs)
assert all(s["student_name"].startswith("Name ") for s in subs)
def test_list_batches_excludes_archived():
store = base_store(marking_batches=[
{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"},
{"id": "b2", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "archived"},
])
c = make_client(store)
ids = {b["id"] for b in c.get("/api/exam/batches").json()["batches"]}
assert ids == {"b1"}
# ─── queue / results / csv (A7) ──────────────────────────────────────────────
def _batch_with_cohort():
return base_store(
marking_batches=[{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"}],
exam_questions=[
{"id": "q1", "template_id": TPL, "label": "01", "max_marks": 3, "order": 0},
{"id": "q2", "template_id": TPL, "label": "02", "max_marks": 5, "order": 1},
],
student_submissions=[
{"id": "sub1", "batch_id": "b1", "student_id": "s1", "student_name": "Alice", "status": "complete"},
{"id": "sub2", "batch_id": "b1", "student_id": "s2", "student_name": "Bob", "status": "absent"},
],
mark_entries=[
{"id": "m1", "batch_id": "b1", "submission_id": "sub1", "question_id": "q1", "awarded_marks": 2},
{"id": "m2", "batch_id": "b1", "submission_id": "sub1", "question_id": "q2", "awarded_marks": 4},
],
)
def test_queue_progress_counts():
c = make_client(_batch_with_cohort())
body = c.get("/api/exam/batches/b1/queue").json()
assert body["progress"]["total"] == 2
assert body["progress"]["absent"] == 1 and body["progress"]["complete"] == 1
counts = {s["id"]: s["mark_entry_count"] for s in body["submissions"]}
assert counts == {"sub1": 2, "sub2": 0}
def test_results_includes_absent_with_blank(monkeypatch):
c = make_client(_batch_with_cohort())
body = c.get("/api/exam/batches/b1/results").json()
by_id = {r["submission_id"]: r for r in body["results"]}
assert by_id["sub1"]["total"] == 6
assert by_id["sub2"]["total"] is None # absent → blank total (A7)
assert set(by_id["sub2"]["marks"].values()) == {None}
assert {r["student_name"] for r in body["results"]} == {"Alice", "Bob"} # absent NOT dropped
def test_csv_includes_absent_row():
c = make_client(_batch_with_cohort())
text = c.get("/api/exam/batches/b1/csv").text
lines = [l for l in text.strip().splitlines() if l]
assert lines[0].split(",")[:3] == ["student_name", "student_id", "status"]
assert "01" in lines[0] and "02" in lines[0] # question labels as columns
assert any(l.startswith("Bob,") and ",absent," in l for l in lines) # absent present
assert len(lines) == 3 # header + 2 students (incl. absent)
# ─── marks ───────────────────────────────────────────────────────────────────
def test_upsert_mark_derives_batch_and_roundtrips():
store = _batch_with_cohort()
c = make_client(store)
r = c.put("/api/exam/marks/mk-1", json={"submission_id": "sub1", "question_id": "q1", "awarded_marks": 3})
assert r.status_code == 200
row = r.json()
assert row["batch_id"] == "b1" and row["awarded_marks"] == 3 and row["id"] == "mk-1"
# upsert again → same id updated, not duplicated
c.put("/api/exam/marks/mk-1", json={"submission_id": "sub1", "question_id": "q1", "awarded_marks": 1})
assert sum(1 for m in store["mark_entries"] if m["id"] == "mk-1") == 1
def test_upsert_mark_flips_absent_submission_to_marking():
store = _batch_with_cohort() # sub2 starts 'absent'
c = make_client(store)
c.put("/api/exam/marks/mk-2", json={"submission_id": "sub2", "question_id": "q1", "awarded_marks": 2})
sub2 = next(s for s in store["student_submissions"] if s["id"] == "sub2")
assert sub2["status"] == "marking"
# results now show a real total for the (formerly absent) marked student
res = {r["submission_id"]: r for r in c.get("/api/exam/batches/b1/results").json()["results"]}
assert res["sub2"]["total"] == 2
def test_upsert_mark_submission_404():
c = make_client(_batch_with_cohort())
assert c.put("/api/exam/marks/mk-x", json={"submission_id": "nope", "question_id": "q1", "awarded_marks": 1}).status_code == 404
# ─── scans (E3 guards) ───────────────────────────────────────────────────────
def _batch_store():
return base_store(marking_batches=[{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"}],
student_submissions=[{"id": "sub1", "batch_id": "b1", "student_id": "s1", "status": "absent"}])
def test_scan_rejects_non_pdf_mime():
c = make_client(_batch_store())
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.png", b"\x89PNG", "image/png")}, data={"matching_method": "manual"})
assert r.status_code == 415
def test_scan_rejects_spoofed_pdf():
c = make_client(_batch_store())
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"not really a pdf", "application/pdf")}, data={"matching_method": "manual"})
assert r.status_code == 415 # magic-byte sniff
def test_scan_rejects_oversize(monkeypatch):
monkeypatch.setattr(batches_mod, "MAX_SCAN_BYTES", 8)
c = make_client(_batch_store())
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"%PDF-" + b"0" * 100, "application/pdf")}, data={"matching_method": "manual"})
assert r.status_code == 413
class _FakeStorage:
def upload_file(self, *a, **k):
return None
def test_scan_manual_match_happy(monkeypatch):
monkeypatch.setattr(batches_mod, "StorageAdmin", _FakeStorage)
store = _batch_store()
c = make_client(store)
r = c.post("/api/exam/batches/b1/scans",
files={"file": ("x.pdf", b"%PDF-1.7 minimal", "application/pdf")},
data={"matching_method": "manual", "student_id": "s1"})
assert r.status_code == 200
assert r.json()["status"] == "matched"
assert store["student_submissions"][0]["status"] == "matched"
assert store["student_submissions"][0]["scan_url"].startswith("exam-submissions/b1/")
def test_scan_denied_for_non_owner(monkeypatch):
monkeypatch.setattr(batches_mod, "StorageAdmin", _FakeStorage)
store = _batch_store()
c = make_client(store, user_id="someone-else")
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"%PDF-1.7", "application/pdf")}, data={"matching_method": "manual"})
assert r.status_code == 403