A roster student starts 'absent' and a direct mark would otherwise still show a blank total. Now total is blank only when absent with no marks; recording a mark advances the submission out of absent/unmatched to 'marking'. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
302 lines
12 KiB
Python
302 lines
12 KiB
Python
"""Tests for /api/exam/batches, /marks, /scans (card S4-6).
|
|
|
|
FakeSupabase emulates RLS by pre-filtering the visible store slice (same approach as
|
|
test_exam_templates). Service-role helpers (name resolution, storage) are monkeypatched; live
|
|
as-user RLS is covered by the .94 smoke.
|
|
"""
|
|
import io
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
import routers.exam.batches as batches_mod
|
|
from routers.exam.batches import router
|
|
from routers.exam.dependencies import ExamContext
|
|
|
|
|
|
TEACHER = "00000000-0000-0000-0000-000000000001"
|
|
INST_A = "10000000-0000-0000-0000-000000000001"
|
|
TPL = "t-1"
|
|
CLASS = "c-1"
|
|
|
|
|
|
class FakeResult:
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
|
|
class FakeQuery:
|
|
def __init__(self, store, table):
|
|
self.store = store
|
|
self.table = table
|
|
self.rows = list(store.get(table, []))
|
|
self._filters = []
|
|
self._op = None
|
|
self._payload = None
|
|
self._limit = None
|
|
|
|
def select(self, *_a, **_k):
|
|
self._op = "select"; return self
|
|
|
|
def insert(self, payload):
|
|
self._op = "insert"; self._payload = payload; return self
|
|
|
|
def update(self, payload):
|
|
self._op = "update"; self._payload = payload; return self
|
|
|
|
def upsert(self, payload):
|
|
self._op = "upsert"; self._payload = payload; return self
|
|
|
|
def delete(self):
|
|
self._op = "delete"; return self
|
|
|
|
def eq(self, k, v):
|
|
self._filters.append(("eq", k, v)); self.rows = [r for r in self.rows if r.get(k) == v]; return self
|
|
|
|
def neq(self, k, v):
|
|
self._filters.append(("neq", k, v)); self.rows = [r for r in self.rows if r.get(k) != v]; return self
|
|
|
|
def in_(self, k, vals):
|
|
vals = set(vals); self._filters.append(("in", k, vals)); self.rows = [r for r in self.rows if r.get(k) in vals]; return self
|
|
|
|
def order(self, *_a, **_k):
|
|
return self
|
|
|
|
def limit(self, n):
|
|
self._limit = n; return self
|
|
|
|
def _match(self, row):
|
|
for op, k, v in self._filters:
|
|
if op == "eq" and row.get(k) != v:
|
|
return False
|
|
if op == "neq" and row.get(k) == v:
|
|
return False
|
|
if op == "in" and row.get(k) not in v:
|
|
return False
|
|
return True
|
|
|
|
def execute(self):
|
|
backing = self.store.setdefault(self.table, [])
|
|
if self._op in ("insert", "upsert"):
|
|
payloads = self._payload if isinstance(self._payload, list) else [self._payload]
|
|
out = []
|
|
for p in payloads:
|
|
row = dict(p)
|
|
if self._op == "upsert" and row.get("id") is not None:
|
|
existing = next((r for r in backing if r.get("id") == row["id"]), None)
|
|
if existing:
|
|
existing.update(row); out.append(existing); continue
|
|
row.setdefault("id", f"gen-{self.table}-{len(backing)}")
|
|
backing.append(row); out.append(row)
|
|
return FakeResult(out)
|
|
if self._op == "update":
|
|
out = []
|
|
for r in backing:
|
|
if self._match(r):
|
|
r.update(self._payload); out.append(r)
|
|
return FakeResult(out)
|
|
if self._op == "delete":
|
|
self.store[self.table] = [r for r in backing if not self._match(r)]
|
|
return FakeResult([r for r in backing if self._match(r)])
|
|
rows = self.rows[: self._limit] if self._limit is not None else self.rows
|
|
return FakeResult(rows)
|
|
|
|
|
|
class FakeSupabase:
|
|
def __init__(self, store):
|
|
self.store = store
|
|
|
|
def table(self, name):
|
|
return FakeQuery(self.store, name)
|
|
|
|
|
|
def make_client(store, user_id=TEACHER, institute_ids=(INST_A,)):
|
|
app = FastAPI()
|
|
app.include_router(router, prefix="/api/exam")
|
|
from routers.exam.dependencies import get_exam_context
|
|
app.dependency_overrides[get_exam_context] = lambda: ExamContext(user_id, "tok", FakeSupabase(store), list(institute_ids))
|
|
return TestClient(app)
|
|
|
|
|
|
def base_store(**extra):
|
|
store = {"exam_templates": [{"id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "draft"}]}
|
|
store.update(extra)
|
|
return store
|
|
|
|
|
|
# ─── batches ───────────────────────────────────────────────────────────────
|
|
|
|
def test_create_batch_no_class():
|
|
store = base_store()
|
|
c = make_client(store)
|
|
r = c.post("/api/exam/batches", json={"template_id": TPL, "title": "Mock 1"})
|
|
assert r.status_code == 200
|
|
b = r.json()
|
|
assert b["teacher_id"] == TEACHER and b["institute_id"] == INST_A
|
|
assert b["status"] == "open" and b["submission_count"] == 0
|
|
|
|
|
|
def test_create_batch_template_404():
|
|
c = make_client(base_store())
|
|
assert c.post("/api/exam/batches", json={"template_id": "nope"}).status_code == 404
|
|
|
|
|
|
def test_create_batch_seeds_roster_as_absent(monkeypatch):
|
|
monkeypatch.setattr(batches_mod, "resolve_student_names",
|
|
lambda ids: {sid: f"Name {sid}" for sid in ids})
|
|
store = base_store(class_students=[
|
|
{"class_id": CLASS, "student_id": "s1", "status": "active"},
|
|
{"class_id": CLASS, "student_id": "s2", "status": "active"},
|
|
{"class_id": CLASS, "student_id": "s3", "status": "inactive"}, # excluded
|
|
])
|
|
c = make_client(store)
|
|
r = c.post("/api/exam/batches", json={"template_id": TPL, "class_id": CLASS})
|
|
assert r.status_code == 200
|
|
assert r.json()["submission_count"] == 2
|
|
subs = store["student_submissions"]
|
|
assert {s["student_id"] for s in subs} == {"s1", "s2"}
|
|
assert all(s["status"] == "absent" for s in subs)
|
|
assert all(s["student_name"].startswith("Name ") for s in subs)
|
|
|
|
|
|
def test_list_batches_excludes_archived():
|
|
store = base_store(marking_batches=[
|
|
{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"},
|
|
{"id": "b2", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "archived"},
|
|
])
|
|
c = make_client(store)
|
|
ids = {b["id"] for b in c.get("/api/exam/batches").json()["batches"]}
|
|
assert ids == {"b1"}
|
|
|
|
|
|
# ─── queue / results / csv (A7) ──────────────────────────────────────────────
|
|
|
|
def _batch_with_cohort():
|
|
return base_store(
|
|
marking_batches=[{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"}],
|
|
exam_questions=[
|
|
{"id": "q1", "template_id": TPL, "label": "01", "max_marks": 3, "order": 0},
|
|
{"id": "q2", "template_id": TPL, "label": "02", "max_marks": 5, "order": 1},
|
|
],
|
|
student_submissions=[
|
|
{"id": "sub1", "batch_id": "b1", "student_id": "s1", "student_name": "Alice", "status": "complete"},
|
|
{"id": "sub2", "batch_id": "b1", "student_id": "s2", "student_name": "Bob", "status": "absent"},
|
|
],
|
|
mark_entries=[
|
|
{"id": "m1", "batch_id": "b1", "submission_id": "sub1", "question_id": "q1", "awarded_marks": 2},
|
|
{"id": "m2", "batch_id": "b1", "submission_id": "sub1", "question_id": "q2", "awarded_marks": 4},
|
|
],
|
|
)
|
|
|
|
|
|
def test_queue_progress_counts():
|
|
c = make_client(_batch_with_cohort())
|
|
body = c.get("/api/exam/batches/b1/queue").json()
|
|
assert body["progress"]["total"] == 2
|
|
assert body["progress"]["absent"] == 1 and body["progress"]["complete"] == 1
|
|
counts = {s["id"]: s["mark_entry_count"] for s in body["submissions"]}
|
|
assert counts == {"sub1": 2, "sub2": 0}
|
|
|
|
|
|
def test_results_includes_absent_with_blank(monkeypatch):
|
|
c = make_client(_batch_with_cohort())
|
|
body = c.get("/api/exam/batches/b1/results").json()
|
|
by_id = {r["submission_id"]: r for r in body["results"]}
|
|
assert by_id["sub1"]["total"] == 6
|
|
assert by_id["sub2"]["total"] is None # absent → blank total (A7)
|
|
assert set(by_id["sub2"]["marks"].values()) == {None}
|
|
assert {r["student_name"] for r in body["results"]} == {"Alice", "Bob"} # absent NOT dropped
|
|
|
|
|
|
def test_csv_includes_absent_row():
|
|
c = make_client(_batch_with_cohort())
|
|
text = c.get("/api/exam/batches/b1/csv").text
|
|
lines = [l for l in text.strip().splitlines() if l]
|
|
assert lines[0].split(",")[:3] == ["student_name", "student_id", "status"]
|
|
assert "01" in lines[0] and "02" in lines[0] # question labels as columns
|
|
assert any(l.startswith("Bob,") and ",absent," in l for l in lines) # absent present
|
|
assert len(lines) == 3 # header + 2 students (incl. absent)
|
|
|
|
|
|
# ─── marks ───────────────────────────────────────────────────────────────────
|
|
|
|
def test_upsert_mark_derives_batch_and_roundtrips():
|
|
store = _batch_with_cohort()
|
|
c = make_client(store)
|
|
r = c.put("/api/exam/marks/mk-1", json={"submission_id": "sub1", "question_id": "q1", "awarded_marks": 3})
|
|
assert r.status_code == 200
|
|
row = r.json()
|
|
assert row["batch_id"] == "b1" and row["awarded_marks"] == 3 and row["id"] == "mk-1"
|
|
# upsert again → same id updated, not duplicated
|
|
c.put("/api/exam/marks/mk-1", json={"submission_id": "sub1", "question_id": "q1", "awarded_marks": 1})
|
|
assert sum(1 for m in store["mark_entries"] if m["id"] == "mk-1") == 1
|
|
|
|
|
|
def test_upsert_mark_flips_absent_submission_to_marking():
|
|
store = _batch_with_cohort() # sub2 starts 'absent'
|
|
c = make_client(store)
|
|
c.put("/api/exam/marks/mk-2", json={"submission_id": "sub2", "question_id": "q1", "awarded_marks": 2})
|
|
sub2 = next(s for s in store["student_submissions"] if s["id"] == "sub2")
|
|
assert sub2["status"] == "marking"
|
|
# results now show a real total for the (formerly absent) marked student
|
|
res = {r["submission_id"]: r for r in c.get("/api/exam/batches/b1/results").json()["results"]}
|
|
assert res["sub2"]["total"] == 2
|
|
|
|
|
|
def test_upsert_mark_submission_404():
|
|
c = make_client(_batch_with_cohort())
|
|
assert c.put("/api/exam/marks/mk-x", json={"submission_id": "nope", "question_id": "q1", "awarded_marks": 1}).status_code == 404
|
|
|
|
|
|
# ─── scans (E3 guards) ───────────────────────────────────────────────────────
|
|
|
|
def _batch_store():
|
|
return base_store(marking_batches=[{"id": "b1", "template_id": TPL, "institute_id": INST_A, "teacher_id": TEACHER, "status": "open"}],
|
|
student_submissions=[{"id": "sub1", "batch_id": "b1", "student_id": "s1", "status": "absent"}])
|
|
|
|
|
|
def test_scan_rejects_non_pdf_mime():
|
|
c = make_client(_batch_store())
|
|
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.png", b"\x89PNG", "image/png")}, data={"matching_method": "manual"})
|
|
assert r.status_code == 415
|
|
|
|
|
|
def test_scan_rejects_spoofed_pdf():
|
|
c = make_client(_batch_store())
|
|
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"not really a pdf", "application/pdf")}, data={"matching_method": "manual"})
|
|
assert r.status_code == 415 # magic-byte sniff
|
|
|
|
|
|
def test_scan_rejects_oversize(monkeypatch):
|
|
monkeypatch.setattr(batches_mod, "MAX_SCAN_BYTES", 8)
|
|
c = make_client(_batch_store())
|
|
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"%PDF-" + b"0" * 100, "application/pdf")}, data={"matching_method": "manual"})
|
|
assert r.status_code == 413
|
|
|
|
|
|
class _FakeStorage:
|
|
def upload_file(self, *a, **k):
|
|
return None
|
|
|
|
|
|
def test_scan_manual_match_happy(monkeypatch):
|
|
monkeypatch.setattr(batches_mod, "StorageAdmin", _FakeStorage)
|
|
store = _batch_store()
|
|
c = make_client(store)
|
|
r = c.post("/api/exam/batches/b1/scans",
|
|
files={"file": ("x.pdf", b"%PDF-1.7 minimal", "application/pdf")},
|
|
data={"matching_method": "manual", "student_id": "s1"})
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "matched"
|
|
assert store["student_submissions"][0]["status"] == "matched"
|
|
assert store["student_submissions"][0]["scan_url"].startswith("exam-submissions/b1/")
|
|
|
|
|
|
def test_scan_denied_for_non_owner(monkeypatch):
|
|
monkeypatch.setattr(batches_mod, "StorageAdmin", _FakeStorage)
|
|
store = _batch_store()
|
|
c = make_client(store, user_id="someone-else")
|
|
r = c.post("/api/exam/batches/b1/scans", files={"file": ("x.pdf", b"%PDF-1.7", "application/pdf")}, data={"matching_method": "manual"})
|
|
assert r.status_code == 403
|