api/tests/test_exam_templates.py
CC Worker e83873e822
Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled
fix(exam): dedupe all AI auto-map rows by id before insert
B1-4 live-route validation: continuation bands re-emit the same stable AI id for
response_areas/boundaries/layout (not just questions), causing duplicate-pkey insert
failures. Add _dedupe_rows_by_id applied to all four tables in _refresh_ai_rows.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 18:02:51 +00:00

722 lines
33 KiB
Python

"""Tests for the /api/exam/templates router (card S4-5).
Mirrors the FakeSupabase + dependency_overrides pattern from test_me_bootstrap.py. The
ExamContext dependency is overridden with an in-memory fake, so these tests exercise the
router's auth/ownership/institute logic without a live Supabase — the as-user RLS itself is
verified separately against .94 (see the evidence note).
"""
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
import routers.exam.templates as templates_mod
from routers.exam.templates import router
from routers.exam.dependencies import ExamContext, get_exam_context
TEACHER = "00000000-0000-0000-0000-000000000001"
OTHER_TEACHER = "00000000-0000-0000-0000-000000000002"
INST_A = "10000000-0000-0000-0000-000000000001"
INST_B = "10000000-0000-0000-0000-000000000002"
@pytest.fixture(autouse=True)
def _stub_projection(monkeypatch):
"""Record projection scheduling and never touch Neo4j/service-role in unit tests."""
calls = []
monkeypatch.setattr(templates_mod, "project_template_safe", lambda tid: calls.append(tid))
monkeypatch.setattr(templates_mod, "project_template", lambda tid: {"exam_code": "X", "questions": 1})
return calls
# ─── in-memory fake supabase ─────────────────────────────────────────────────
class FakeResult:
def __init__(self, data):
self.data = data
class FakeQuery:
"""Models the subset of the supabase-py builder the router uses, against a row list.
Crucially it emulates RLS: the backing store is pre-filtered to the rows the caller can
see, so cross-institute / non-owner access naturally reads back empty (→ 404)."""
def __init__(self, store, table):
self.store = store
self.table = table
self.rows = list(store.get(table, []))
self._filters = []
self._op = None
self._payload = None
self._limit = None
def select(self, *_a, **_k):
self._op = "select"
return self
def insert(self, payload):
self._op = "insert"
self._payload = payload
return self
def update(self, payload):
self._op = "update"
self._payload = payload
return self
def delete(self):
self._op = "delete"
return self
def eq(self, key, value):
self._filters.append(("eq", key, value))
self.rows = [r for r in self.rows if r.get(key) == value]
return self
def neq(self, key, value):
self._filters.append(("neq", key, value))
self.rows = [r for r in self.rows if r.get(key) != value]
return self
def in_(self, key, values):
values = set(values)
self._filters.append(("in", key, values))
self.rows = [r for r in self.rows if r.get(key) in values]
return self
def order(self, *_a, **_k):
return self
def limit(self, n):
self._limit = n
return self
def _matches(self, row):
for op, key, value in self._filters:
if op == "eq" and row.get(key) != value:
return False
if op == "neq" and row.get(key) == value:
return False
return True
def execute(self):
backing = self.store.setdefault(self.table, [])
if self._op == "insert":
payloads = self._payload if isinstance(self._payload, list) else [self._payload]
inserted = []
for p in payloads:
row = dict(p)
row.setdefault("id", f"gen-{self.table}-{len(backing)}")
backing.append(row)
inserted.append(row)
return FakeResult(inserted)
if self._op == "update":
updated = []
for row in backing:
if self._matches(row):
row.update(self._payload)
updated.append(row)
return FakeResult(updated)
if self._op == "delete":
kept = [r for r in backing if not self._matches(r)]
removed = [r for r in backing if self._matches(r)]
self.store[self.table] = kept
return FakeResult(removed)
# select
rows = self.rows[: self._limit] if self._limit is not None else self.rows
return FakeResult(rows)
class FakeSupabase:
def __init__(self, store):
self.store = store
def table(self, name):
return FakeQuery(self.store, name)
class _FakeStorageAdmin:
def upload_file(self, *args, **kwargs):
return None
def download_file(self, bucket_id, file_path):
return b"%PDF-1.7 fake"
def create_signed_url(self, bucket_id, file_path, expires_in=3600):
return {"signedURL": f"https://storage.test/{bucket_id}/{file_path}?token=fake&expires_in={expires_in}"}
class _FakeServiceRoleClient:
def __init__(self, store):
self.supabase = FakeSupabase(store)
def make_client(user_id=TEACHER, institute_ids=(INST_A,), store=None):
store = store if store is not None else {}
app = FastAPI()
app.include_router(router, prefix="/api/exam")
def _ctx():
return ExamContext(user_id, "fake-token", FakeSupabase(store), list(institute_ids))
app.dependency_overrides[get_exam_context] = _ctx
return TestClient(app), store
# ─── tests ───────────────────────────────────────────────────────────────────
def test_requires_auth_when_not_overridden():
app = FastAPI()
app.include_router(router, prefix="/api/exam")
# No dependency override → real SupabaseBearer runs and rejects the missing token.
resp = TestClient(app).get("/api/exam/templates")
assert resp.status_code in (401, 403) # unauthenticated, not processed
def test_catalogue_requires_auth_when_not_overridden():
app = FastAPI()
app.include_router(router, prefix="/api/exam")
resp = TestClient(app).get("/api/exam/catalogue")
assert resp.status_code in (401, 403)
def test_list_catalogue_papers_uses_as_user_metadata():
store = {
"eb_exams": [
{"id": "e1", "exam_code": "AQA-1", "type_code": "QP", "storage_loc": "cc.examboards/aqa/p.pdf"},
{"id": "e2", "exam_code": "AQA-MS", "type_code": "MS", "storage_loc": "cc.examboards/aqa/ms.pdf"},
]
}
client, _ = make_client(store=store)
resp = client.get("/api/exam/catalogue")
assert resp.status_code == 200
assert [p["id"] for p in resp.json()["papers"]] == ["e1"]
def test_catalogue_signed_url_requires_auth_and_signs_examboard_pdf(monkeypatch):
store = {
"eb_exams": [
{"id": "e1", "exam_code": "AQA-1", "type_code": "QP", "storage_loc": "cc.examboards/aqa/physics/qp.pdf"},
]
}
client, _ = make_client(store=store)
monkeypatch.setattr(templates_mod, "StorageAdmin", _FakeStorageAdmin)
resp = client.get("/api/exam/catalogue/e1/signed-url?expires_in=120")
assert resp.status_code == 200
body = resp.json()
assert body["bucket"] == "cc.examboards"
assert body["path"] == "aqa/physics/qp.pdf"
assert body["expires_in"] == 120
assert "token=fake" in body["signed_url"]
def test_catalogue_signed_url_rejects_non_examboard_storage(monkeypatch):
store = {
"eb_exams": [
{"id": "e1", "exam_code": "AQA-1", "type_code": "QP", "storage_loc": "cc.public/aqa/physics/qp.pdf"},
]
}
client, _ = make_client(store=store)
monkeypatch.setattr(templates_mod, "StorageAdmin", _FakeStorageAdmin)
assert client.get("/api/exam/catalogue/e1/signed-url").status_code == 404
def test_catalogue_signed_url_rejects_non_catalogue_doc_type(monkeypatch):
store = {
"eb_exams": [
{"id": "e1", "exam_code": "AQA-MS", "type_code": "MS", "storage_loc": "cc.examboards/aqa/physics/ms.pdf"},
]
}
client, _ = make_client(store=store)
monkeypatch.setattr(templates_mod, "StorageAdmin", _FakeStorageAdmin)
assert client.get("/api/exam/catalogue/e1/signed-url").status_code == 404
def test_create_template_sets_owner_and_institute():
client, store = make_client()
resp = client.post("/api/exam/templates", json={"title": "AQA Physics 1H", "subject": "Physics"})
assert resp.status_code == 200
row = resp.json()
assert row["title"] == "AQA Physics 1H"
assert row["teacher_id"] == TEACHER
assert row["institute_id"] == INST_A
assert row["status"] == "draft"
def test_create_template_accepts_uploaded_source_pdf(monkeypatch):
store = {}
client, store = make_client(store=store)
monkeypatch.setattr(templates_mod, "StorageAdmin", _FakeStorageAdmin)
monkeypatch.setattr(templates_mod, "SupabaseServiceRoleClient", lambda: _FakeServiceRoleClient(store))
resp = client.post(
"/api/exam/templates",
data={"title": "AQA Physics 1H", "subject": "Physics"},
files={"source_pdf": ("paper.pdf", b"%PDF-1.7 test", "application/pdf")},
)
assert resp.status_code == 200
row = resp.json()
assert row["source_file_id"] is not None
assert store["files"][0]["id"] == row["source_file_id"]
assert store["files"][0]["uploaded_by"] == TEACHER
def test_get_template_source_pdf_from_uploaded_file(monkeypatch):
store = {
"exam_templates": [{
"id": "t1",
"title": "p",
"status": "draft",
"institute_id": INST_A,
"teacher_id": TEACHER,
"source_file_id": "f1",
}],
"files": [{"id": "f1", "bucket": "cc.users", "path": "exam-marker/cab1/f1/paper.pdf", "name": "paper.pdf"}],
}
client, _ = make_client(store=store)
monkeypatch.setattr(templates_mod, "StorageAdmin", _FakeStorageAdmin)
# The download resolves the files row via service role (sidesteps the broken cabinet_memberships
# RLS recursion) — mock it to the same fake store, like the upload test does.
monkeypatch.setattr(templates_mod, "SupabaseServiceRoleClient", lambda: _FakeServiceRoleClient(store))
resp = client.get("/api/exam/templates/t1/source-pdf")
assert resp.status_code == 200
assert resp.headers["content-type"].startswith("application/pdf")
assert resp.content.startswith(b"%PDF-1.7")
def test_create_template_rejects_foreign_institute():
client, _ = make_client(institute_ids=(INST_A,))
resp = client.post("/api/exam/templates", json={"title": "X", "institute_id": INST_B})
assert resp.status_code == 403
def test_create_template_requires_institute_when_ambiguous():
client, _ = make_client(institute_ids=(INST_A, INST_B))
resp = client.post("/api/exam/templates", json={"title": "X"})
assert resp.status_code == 400
def test_list_excludes_archived_by_default():
store = {
"exam_templates": [
{"id": "t1", "title": "live", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER},
{"id": "t2", "title": "gone", "status": "archived", "institute_id": INST_A, "teacher_id": TEACHER},
]
}
client, _ = make_client(store=store)
titles = [t["title"] for t in client.get("/api/exam/templates").json()["templates"]]
assert titles == ["live"]
all_titles = {t["title"] for t in client.get("/api/exam/templates?include_archived=true").json()["templates"]}
assert all_titles == {"live", "gone"}
def test_get_template_bundles_children():
store = {
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}],
"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "order": 0}],
"exam_response_areas": [{"id": "r1", "template_id": "t1", "question_id": "q1", "page": 1}],
"exam_boundaries": [{"id": "b1", "template_id": "t1", "page_index": 0, "y": 10}],
"exam_template_layout": [{"id": "l1", "template_id": "t1", "page_index": 0, "role": "question_page"}],
}
client, _ = make_client(store=store)
body = client.get("/api/exam/templates/t1").json()
assert len(body["questions"]) == 1
assert len(body["response_areas"]) == 1
assert len(body["boundaries"]) == 1
assert body["layout"] == [{"id": "l1", "template_id": "t1", "page_index": 0, "role": "question_page"}]
def test_get_other_institute_template_is_404():
# RLS emulation: a template the caller can't see isn't in their visible store slice.
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_B, "teacher_id": OTHER_TEACHER}]}
client, _ = make_client(institute_ids=(INST_A,), store=store)
# The fake store doesn't model institute filtering on read, so simulate the RLS-hidden row
# by querying an id the caller's store doesn't contain.
assert client.get("/api/exam/templates/does-not-exist").status_code == 404
def test_put_replace_persists_children_with_client_ids():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
client, store = make_client(store=store)
payload = {
"questions": [{"id": "q-uuid-1", "label": "01.1", "order": 0, "max_marks": 3}],
"response_areas": [{"id": "r-uuid-1", "question_id": "q-uuid-1", "page": 1, "bounds": {"x": 1}, "kind": "response"}],
"boundaries": [{"id": "b-uuid-1", "page_index": 0, "y": 12.5}],
}
resp = client.put("/api/exam/templates/t1", json=payload)
assert resp.status_code == 200
body = resp.json()
assert body["questions"][0]["id"] == "q-uuid-1" # client UUID preserved (Neo4j join key)
assert body["response_areas"][0]["id"] == "r-uuid-1"
assert body["boundaries"][0]["id"] == "b-uuid-1"
def test_put_persists_region_kinds_and_part_geometry():
# S4-9 taxonomy: Part box geometry on the question; new region kinds + context_type.
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
client, store = make_client(store=store)
resp = client.put("/api/exam/templates/t1", json={
"questions": [
{"id": "q1", "label": "01", "order": 0, "is_container": True},
{"id": "p1", "parent_id": "q1", "label": "01.1", "order": 0, "max_marks": 3,
"bounds": {"x": 1, "y": 2, "w": 3, "h": 4}, "page": 1},
],
"response_areas": [
{"id": "r1", "question_id": "p1", "page": 1, "bounds": {"x": 1}, "kind": "response", "response_form": "lines"},
{"id": "c1", "question_id": "p1", "page": 1, "bounds": {"x": 1}, "kind": "context", "context_type": "data_table"},
{"id": "qn1", "question_id": "p1", "page": 1, "bounds": {"x": 1}, "kind": "question_number"},
{"id": "m1", "question_id": "p1", "page": 1, "bounds": {"x": 1}, "kind": "mark_area"},
{"id": "f1", "question_id": "p1", "page": 1, "bounds": {"x": 1}, "kind": "furniture"},
],
})
assert resp.status_code == 200
part = next(q for q in store["exam_questions"] if q["id"] == "p1")
assert part["bounds"] == {"x": 1, "y": 2, "w": 3, "h": 4} and part["page"] == 1
ras = {r["id"]: r for r in store["exam_response_areas"]}
assert {ras["r1"]["kind"], ras["c1"]["kind"], ras["qn1"]["kind"], ras["m1"]["kind"], ras["f1"]["kind"]} == \
{"response", "context", "question_number", "mark_area", "furniture"}
assert ras["c1"]["context_type"] == "data_table"
def test_put_round_trips_s5_layout_and_provenance_fields():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
client, store = make_client(store=store)
payload = {
"questions": [{
"id": "q1", "label": "01", "order": 0, "max_marks": 4, "source": "ai",
"confirmed": False, "confidence": 0.82, "derivation": "docling:heading",
}],
"response_areas": [{
"id": "m1", "question_id": "q1", "page": 1, "bounds": {"x": 1}, "kind": "mark_area",
"mark_subtype": "grader_box", "source": "ai", "confirmed": False, "confidence": 0.71,
"derivation": "detected-explicit-grader-box",
}],
"boundaries": [{
"id": "b1", "question_id": "q1", "page_index": 0, "y": 99, "source": "ai",
"confirmed": False, "confidence": 0.66, "derivation": "bbox-gap",
}],
"layout": [{
"id": "l1", "page_index": 0, "role": "question_page", "margin_left": 12.5,
"margins_enabled": False, "source": "ai", "confirmed": False, "confidence": 0.93,
"derivation": "docling-page-layout", "meta": {"columns": 2},
}],
}
resp = client.put("/api/exam/templates/t1", json=payload)
assert resp.status_code == 200
assert store["exam_questions"][0]["source"] == "ai"
assert store["exam_questions"][0]["confidence"] == 0.82
assert store["exam_response_areas"][0]["mark_subtype"] == "grader_box"
assert store["exam_response_areas"][0]["derivation"] == "detected-explicit-grader-box"
assert store["exam_boundaries"][0]["confidence"] == 0.66
assert store["exam_template_layout"][0]["meta"] == {"columns": 2}
body = resp.json()
assert body["layout"][0]["id"] == "l1"
assert body["layout"][0]["margins_enabled"] is False
def test_put_replace_clears_previous_children():
store = {
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}],
"exam_questions": [{"id": "old", "template_id": "t1", "label": "stale", "order": 0}],
}
client, store = make_client(store=store)
client.put("/api/exam/templates/t1", json={"questions": [{"id": "new", "label": "fresh", "order": 0}]})
ids = {q["id"] for q in store["exam_questions"]}
assert ids == {"new"} # old row replaced, not appended
def test_put_replace_blocked_when_marks_recorded():
# Re-saving the structure after marking began would cascade-delete mark_entries → guard 409.
store = {
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}],
"marking_batches": [{"id": "b1", "template_id": "t1", "teacher_id": TEACHER, "institute_id": INST_A}],
"mark_entries": [{"id": "m1", "batch_id": "b1", "submission_id": "s1", "question_id": "q1", "awarded_marks": 2}],
"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "order": 0}],
}
client, store = make_client(store=store)
r = client.put("/api/exam/templates/t1", json={"questions": [{"id": "q2", "label": "new", "order": 0}]})
assert r.status_code == 409
# original question untouched (no destructive delete happened)
assert {q["id"] for q in store["exam_questions"]} == {"q1"}
def test_put_replace_allowed_when_batch_has_no_marks():
store = {
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}],
"marking_batches": [{"id": "b1", "template_id": "t1", "teacher_id": TEACHER, "institute_id": INST_A}],
"mark_entries": [],
}
client, _ = make_client(store=store)
assert client.put("/api/exam/templates/t1", json={"questions": [{"id": "q2", "label": "new", "order": 0}]}).status_code == 200
def test_put_replace_denied_for_non_owner():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]}
# Caller is a colleague in the same institute (can read), but not the owner → 403.
client, _ = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store)
resp = client.put("/api/exam/templates/t1", json={"questions": []})
assert resp.status_code == 403
def test_patch_template_meta_does_not_replace_children_when_marks_recorded():
store = {
"exam_templates": [{"id": "t1", "title": "old", "subject": "Physics", "page_count": 12, "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}],
"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "order": 0}],
"marking_batches": [{"id": "b1", "template_id": "t1", "teacher_id": TEACHER, "institute_id": INST_A}],
"mark_entries": [{"id": "m1", "batch_id": "b1", "submission_id": "s1", "question_id": "q1", "awarded_marks": 2}],
}
client, store = make_client(store=store)
resp = client.patch("/api/exam/templates/t1", json={"title": "renamed", "subject": None, "page_count": 13})
assert resp.status_code == 200
body = resp.json()
assert body["title"] == "renamed"
assert body["subject"] is None
assert body["page_count"] == 13
assert {q["id"] for q in store["exam_questions"]} == {"q1"}
assert {m["id"] for m in store["mark_entries"]} == {"m1"}
def test_patch_template_meta_denied_for_non_owner():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]}
client, _ = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store)
assert client.patch("/api/exam/templates/t1", json={"title": "nope"}).status_code == 403
def test_patch_template_meta_empty_body_is_400():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
client, _ = make_client(store=store)
assert client.patch("/api/exam/templates/t1", json={}).status_code == 400
def test_archive_soft_deletes():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
client, store = make_client(store=store)
resp = client.delete("/api/exam/templates/t1")
assert resp.status_code == 200
assert store["exam_templates"][0]["status"] == "archived" # not hard-deleted
def test_patch_question_updates_fields():
store = {"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "max_marks": 0}]}
client, store = make_client(store=store)
resp = client.patch("/api/exam/questions/q1", json={"max_marks": 5, "spec_ref": "8.1.2"})
assert resp.status_code == 200
assert resp.json()["max_marks"] == 5
assert store["exam_questions"][0]["spec_ref"] == "8.1.2"
def test_patch_question_missing_is_404():
client, _ = make_client(store={"exam_questions": []})
assert client.patch("/api/exam/questions/nope", json={"max_marks": 1}).status_code == 404
def test_patch_question_empty_body_is_400():
store = {"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01"}]}
client, _ = make_client(store=store)
assert client.patch("/api/exam/questions/q1", json={}).status_code == 400
# ─── Neo4j projection (S4-7) ─────────────────────────────────────────────────
def test_put_schedules_projection(_stub_projection):
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
client, _ = make_client(store=store)
client.put("/api/exam/templates/t1", json={"questions": []})
assert _stub_projection == ["t1"] # projection enqueued for the saved template
def test_neo4j_sync_owner_runs():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
client, _ = make_client(store=store)
r = client.post("/api/exam/templates/t1/neo4j-sync")
assert r.status_code == 200
assert r.json()["projection"]["exam_code"] == "X"
def test_neo4j_sync_non_owner_403():
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]}
client, _ = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store)
assert client.post("/api/exam/templates/t1/neo4j-sync").status_code == 403
def test_neo4j_sync_404():
client, _ = make_client(store={"exam_templates": []})
assert client.post("/api/exam/templates/does-not-exist/neo4j-sync").status_code == 404
# ─── S5 auto-map endpoint ────────────────────────────────────────────────────
def _first_pass_template():
return {
"meta": {"schema": "exam-template/first-pass/v1", "paper_code": "8463/1", "n_pages": 1},
"margins": [
{"edge": "left", "axis": "x", "value": 50, "scope": "document", "source": "auto", "confirmed": False},
{"edge": "right", "axis": "x", "value": 550, "scope": "document", "source": "auto", "confirmed": False},
{"edge": "top", "axis": "y", "value": 780, "scope": "page", "page": 1, "source": "auto", "confirmed": False},
{"edge": "bottom", "axis": "y", "value": 60, "scope": "page", "page": 1, "source": "auto", "confirmed": False},
],
"pages": {
"1": {
"role": "question", "role_source": "auto", "margins_enabled": True,
"main_bands": [{"question": "01", "y_start": 780, "y_end": 60, "source": "auto", "confirmed": False}],
"part_bands": [{"label": "01.1", "question": "01", "y_start": 700, "y_end": 500, "label_box": {"l": 50, "t": 700, "r": 90, "b": 680, "coord_origin": "BOTTOMLEFT"}, "source": "auto", "confirmed": False}],
"furniture": [], "figures": [], "tables": [],
}
},
}
def _patch_auto_map(monkeypatch, store, *, fast=True):
monkeypatch.setattr(templates_mod, "StorageAdmin", _FakeStorageAdmin)
monkeypatch.setattr(templates_mod, "SupabaseServiceRoleClient", lambda: _FakeServiceRoleClient(store))
monkeypatch.setattr(templates_mod, "_pdf_has_text_layer", lambda _pdf: fast)
monkeypatch.setattr(templates_mod, "auto_map", lambda *_a, **_k: _first_pass_template())
monkeypatch.setattr(templates_mod, "detect_response_regions_from_pdf", lambda *_a, **_k: [])
monkeypatch.setattr(templates_mod, "_pdf_page_geometry", lambda _pdf: [{"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0, "page_pt_w": 600.0, "page_pt_h": 800.0, "rendered_w": 600.0, "rendered_h": 800.0, "page_top": 0.0}])
templates_mod._AUTO_MAP_JOB_STATUS.clear()
def _template_with_source(owner=TEACHER):
return {
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": owner, "source_file_id": "f1"}],
"files": [{"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/paper.pdf", "name": "paper.pdf"}],
}
def test_box_to_canvas_uses_cropbox_as_page_origin():
pages = [{
"media_x0": 0.0, "crop_x0": 100.0, "crop_y0": 200.0,
"page_pt_w": 400.0, "page_pt_h": 600.0,
"rendered_w": 400.0, "rendered_h": 600.0,
"page_top": 25.0,
}]
box = {"l": 100.0, "t": 800.0, "r": 180.0, "b": 760.0, "coord_origin": "BOTTOMLEFT"}
assert templates_mod._box_to_canvas(box, 1, pages) == {"x": 0.0, "y": 25.0, "w": 80.0, "h": 40.0}
def test_auto_map_deduplicates_continued_part_labels(monkeypatch):
monkeypatch.setattr(templates_mod, "_pdf_page_geometry", lambda _pdf: [
{"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0, "page_pt_w": 600.0, "page_pt_h": 800.0, "rendered_w": 600.0, "rendered_h": 800.0, "page_top": 0.0},
{"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0, "page_pt_w": 600.0, "page_pt_h": 800.0, "rendered_w": 600.0, "rendered_h": 800.0, "page_top": 800.0},
])
first_pass = _first_pass_template()
first_pass["meta"]["n_pages"] = 2
first_pass["pages"]["2"] = {
"role": "question", "role_source": "auto", "margins_enabled": True,
"main_bands": [],
"part_bands": [{"label": "01.1", "question": "01", "y_start": 760, "y_end": 600, "label_box": {"l": 50, "t": 760, "r": 90, "b": 740, "coord_origin": "BOTTOMLEFT"}, "source": "auto", "confirmed": False}],
"furniture": [], "figures": [], "tables": [],
}
rows = templates_mod._map_first_pass_to_rows("t1", first_pass, b"%PDF", [])
question_ids = [q["id"] for q in rows["questions"]]
assert len(question_ids) == len(set(question_ids))
assert [q["label"] for q in rows["questions"]].count("01.1") == 1
def test_response_region_types_are_mapped_to_response_form_enum(monkeypatch):
monkeypatch.setattr(templates_mod, "_pdf_page_geometry", lambda _pdf: [{"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0, "page_pt_w": 600.0, "page_pt_h": 800.0, "rendered_w": 600.0, "rendered_h": 800.0, "page_top": 0.0}])
first_pass = _first_pass_template()
regions = [
{"page_index": 0, "bbox": {"l": 50, "t": 700, "r": 100, "b": 680, "coord_origin": "BOTTOMLEFT"}, "region_type": "answer_lines", "confidence": 0.9},
{"page_index": 0, "bbox": {"l": 50, "t": 650, "r": 100, "b": 620, "coord_origin": "BOTTOMLEFT"}, "region_type": "answer_box", "confidence": 0.9},
{"page_index": 0, "bbox": {"l": 50, "t": 600, "r": 100, "b": 560, "coord_origin": "BOTTOMLEFT"}, "region_type": "working_space", "confidence": 0.9},
]
rows = templates_mod._map_first_pass_to_rows("t1", first_pass, b"%PDF", regions)
forms = [r.get("response_form") for r in rows["response_areas"] if r.get("derivation") == "opencv-response-region"]
assert forms == ["lines", "answer-box", "working"]
def test_auto_map_fast_path_merges_ai_rows_and_returns_detail(monkeypatch):
store = _template_with_source()
client, store = make_client(store=store)
_patch_auto_map(monkeypatch, store, fast=True)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 200
body = resp.json()
assert body["exam_code"] == "8463/1"
assert body["layout"] and body["layout"][0]["source"] == "ai"
assert any(q["label"] == "01.1" and q["source"] == "ai" and q["confirmed"] is False for q in store["exam_questions"])
assert store["exam_boundaries"] and store["exam_boundaries"][0]["derivation"] == "docling-main-band"
def test_auto_map_deduplicates_repeated_response_area_ids(monkeypatch):
store = _template_with_source()
client, store = make_client(store=store)
_patch_auto_map(monkeypatch, store, fast=True)
dup = {"page_index": 0, "bbox": {"l": 50, "t": 700, "r": 100, "b": 680, "coord_origin": "BOTTOMLEFT"}, "region_type": "answer_lines", "confidence": 0.9}
monkeypatch.setattr(templates_mod, "detect_response_regions_from_pdf", lambda *_a, **_k: [dup, dict(dup)])
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 200
response_area_ids = [r["id"] for r in store["exam_response_areas"]]
assert len(response_area_ids) == len(set(response_area_ids))
def test_auto_map_preserves_manual_and_confirmed_rows_on_rerun(monkeypatch):
store = _template_with_source()
store.update({
"exam_questions": [
{"id": "manual", "template_id": "t1", "label": "manual", "order": 0, "source": "manual", "confirmed": True},
{"id": "accepted-ai", "template_id": "t1", "label": "accepted", "order": 1, "source": "ai", "confirmed": True},
{"id": "old-ai", "template_id": "t1", "label": "old", "order": 2, "source": "ai", "confirmed": False},
],
"exam_response_areas": [], "exam_boundaries": [], "exam_template_layout": [],
})
client, store = make_client(store=store)
_patch_auto_map(monkeypatch, store, fast=True)
assert client.post("/api/exam/templates/t1/auto-map").status_code == 200
ids = {q["id"] for q in store["exam_questions"]}
assert {"manual", "accepted-ai"}.issubset(ids)
assert "old-ai" not in ids
def test_auto_map_non_owner_is_403_before_download(monkeypatch):
store = _template_with_source(owner=OTHER_TEACHER)
client, store = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store)
def _no_download(*_a, **_k):
raise AssertionError("download should not run before owner gate")
monkeypatch.setattr(templates_mod, "StorageAdmin", _no_download)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 403
def test_auto_map_owner_lost_institute_membership_is_404_before_download(monkeypatch):
store = _template_with_source(owner=TEACHER)
client, store = make_client(user_id=TEACHER, institute_ids=(INST_B,), store=store)
def _no_download(*_a, **_k):
raise AssertionError("download should not run before visibility gate")
monkeypatch.setattr(templates_mod, "StorageAdmin", _no_download)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 404
def test_auto_map_blocks_when_marks_recorded(monkeypatch):
store = _template_with_source()
store.update({
"marking_batches": [{"id": "b1", "template_id": "t1"}],
"mark_entries": [{"id": "m1", "batch_id": "b1"}],
})
client, store = make_client(store=store)
_patch_auto_map(monkeypatch, store, fast=True)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 409
def test_auto_map_ocr_returns_job_id_and_status_completes(monkeypatch):
store = _template_with_source()
client, store = make_client(store=store)
_patch_auto_map(monkeypatch, store, fast=False)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 202
job_id = resp.json()["job_id"]
status = client.get(f"/api/exam/templates/t1/auto-map/{job_id}/status")
assert status.status_code == 200
body = status.json()
assert body["status"] == "completed"
assert body["counts"]["questions"] >= 2
assert body["template"]["layout"]