PUT full-replace deletes exam_questions, and mark_entries.question_id cascades
ON DELETE — so re-saving the setup canvas after marking began would silently
wipe recorded marks. Guard: 409 if any mark_entry exists for the template's
batches. Mark-scheme edits (PATCH /questions/{id}) are unaffected.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
335 lines
14 KiB
Python
335 lines
14 KiB
Python
"""Tests for the /api/exam/templates router (card S4-5).
|
|
|
|
Mirrors the FakeSupabase + dependency_overrides pattern from test_me_bootstrap.py. The
|
|
ExamContext dependency is overridden with an in-memory fake, so these tests exercise the
|
|
router's auth/ownership/institute logic without a live Supabase — the as-user RLS itself is
|
|
verified separately against .94 (see the evidence note).
|
|
"""
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
import routers.exam.templates as templates_mod
|
|
from routers.exam.templates import router
|
|
from routers.exam.dependencies import ExamContext, get_exam_context
|
|
|
|
|
|
TEACHER = "00000000-0000-0000-0000-000000000001"
|
|
OTHER_TEACHER = "00000000-0000-0000-0000-000000000002"
|
|
INST_A = "10000000-0000-0000-0000-000000000001"
|
|
INST_B = "10000000-0000-0000-0000-000000000002"
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _stub_projection(monkeypatch):
|
|
"""Record projection scheduling and never touch Neo4j/service-role in unit tests."""
|
|
calls = []
|
|
monkeypatch.setattr(templates_mod, "project_template_safe", lambda tid: calls.append(tid))
|
|
monkeypatch.setattr(templates_mod, "project_template", lambda tid: {"exam_code": "X", "questions": 1})
|
|
return calls
|
|
|
|
|
|
# ─── in-memory fake supabase ─────────────────────────────────────────────────
|
|
|
|
class FakeResult:
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
|
|
class FakeQuery:
|
|
"""Models the subset of the supabase-py builder the router uses, against a row list.
|
|
|
|
Crucially it emulates RLS: the backing store is pre-filtered to the rows the caller can
|
|
see, so cross-institute / non-owner access naturally reads back empty (→ 404)."""
|
|
|
|
def __init__(self, store, table):
|
|
self.store = store
|
|
self.table = table
|
|
self.rows = list(store.get(table, []))
|
|
self._filters = []
|
|
self._op = None
|
|
self._payload = None
|
|
self._limit = None
|
|
|
|
def select(self, *_a, **_k):
|
|
self._op = "select"
|
|
return self
|
|
|
|
def insert(self, payload):
|
|
self._op = "insert"
|
|
self._payload = payload
|
|
return self
|
|
|
|
def update(self, payload):
|
|
self._op = "update"
|
|
self._payload = payload
|
|
return self
|
|
|
|
def delete(self):
|
|
self._op = "delete"
|
|
return self
|
|
|
|
def eq(self, key, value):
|
|
self._filters.append(("eq", key, value))
|
|
self.rows = [r for r in self.rows if r.get(key) == value]
|
|
return self
|
|
|
|
def neq(self, key, value):
|
|
self._filters.append(("neq", key, value))
|
|
self.rows = [r for r in self.rows if r.get(key) != value]
|
|
return self
|
|
|
|
def in_(self, key, values):
|
|
values = set(values)
|
|
self._filters.append(("in", key, values))
|
|
self.rows = [r for r in self.rows if r.get(key) in values]
|
|
return self
|
|
|
|
def order(self, *_a, **_k):
|
|
return self
|
|
|
|
def limit(self, n):
|
|
self._limit = n
|
|
return self
|
|
|
|
def _matches(self, row):
|
|
for op, key, value in self._filters:
|
|
if op == "eq" and row.get(key) != value:
|
|
return False
|
|
if op == "neq" and row.get(key) == value:
|
|
return False
|
|
return True
|
|
|
|
def execute(self):
|
|
backing = self.store.setdefault(self.table, [])
|
|
if self._op == "insert":
|
|
payloads = self._payload if isinstance(self._payload, list) else [self._payload]
|
|
inserted = []
|
|
for p in payloads:
|
|
row = dict(p)
|
|
row.setdefault("id", f"gen-{self.table}-{len(backing)}")
|
|
backing.append(row)
|
|
inserted.append(row)
|
|
return FakeResult(inserted)
|
|
if self._op == "update":
|
|
updated = []
|
|
for row in backing:
|
|
if self._matches(row):
|
|
row.update(self._payload)
|
|
updated.append(row)
|
|
return FakeResult(updated)
|
|
if self._op == "delete":
|
|
kept = [r for r in backing if not self._matches(r)]
|
|
removed = [r for r in backing if self._matches(r)]
|
|
self.store[self.table] = kept
|
|
return FakeResult(removed)
|
|
# select
|
|
rows = self.rows[: self._limit] if self._limit is not None else self.rows
|
|
return FakeResult(rows)
|
|
|
|
|
|
class FakeSupabase:
|
|
def __init__(self, store):
|
|
self.store = store
|
|
|
|
def table(self, name):
|
|
return FakeQuery(self.store, name)
|
|
|
|
|
|
def make_client(user_id=TEACHER, institute_ids=(INST_A,), store=None):
|
|
store = store if store is not None else {}
|
|
app = FastAPI()
|
|
app.include_router(router, prefix="/api/exam")
|
|
|
|
def _ctx():
|
|
return ExamContext(user_id, "fake-token", FakeSupabase(store), list(institute_ids))
|
|
|
|
app.dependency_overrides[get_exam_context] = _ctx
|
|
return TestClient(app), store
|
|
|
|
|
|
# ─── tests ───────────────────────────────────────────────────────────────────
|
|
|
|
def test_requires_auth_when_not_overridden():
|
|
app = FastAPI()
|
|
app.include_router(router, prefix="/api/exam")
|
|
# No dependency override → real SupabaseBearer runs and rejects the missing token.
|
|
resp = TestClient(app).get("/api/exam/templates")
|
|
assert resp.status_code in (401, 403) # unauthenticated, not processed
|
|
|
|
|
|
def test_create_template_sets_owner_and_institute():
|
|
client, store = make_client()
|
|
resp = client.post("/api/exam/templates", json={"title": "AQA Physics 1H", "subject": "Physics"})
|
|
assert resp.status_code == 200
|
|
row = resp.json()
|
|
assert row["title"] == "AQA Physics 1H"
|
|
assert row["teacher_id"] == TEACHER
|
|
assert row["institute_id"] == INST_A
|
|
assert row["status"] == "draft"
|
|
|
|
|
|
def test_create_template_rejects_foreign_institute():
|
|
client, _ = make_client(institute_ids=(INST_A,))
|
|
resp = client.post("/api/exam/templates", json={"title": "X", "institute_id": INST_B})
|
|
assert resp.status_code == 403
|
|
|
|
|
|
def test_create_template_requires_institute_when_ambiguous():
|
|
client, _ = make_client(institute_ids=(INST_A, INST_B))
|
|
resp = client.post("/api/exam/templates", json={"title": "X"})
|
|
assert resp.status_code == 400
|
|
|
|
|
|
def test_list_excludes_archived_by_default():
|
|
store = {
|
|
"exam_templates": [
|
|
{"id": "t1", "title": "live", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER},
|
|
{"id": "t2", "title": "gone", "status": "archived", "institute_id": INST_A, "teacher_id": TEACHER},
|
|
]
|
|
}
|
|
client, _ = make_client(store=store)
|
|
titles = [t["title"] for t in client.get("/api/exam/templates").json()["templates"]]
|
|
assert titles == ["live"]
|
|
all_titles = {t["title"] for t in client.get("/api/exam/templates?include_archived=true").json()["templates"]}
|
|
assert all_titles == {"live", "gone"}
|
|
|
|
|
|
def test_get_template_bundles_children():
|
|
store = {
|
|
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}],
|
|
"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "order": 0}],
|
|
"exam_response_areas": [{"id": "r1", "template_id": "t1", "question_id": "q1", "page": 1}],
|
|
"exam_boundaries": [{"id": "b1", "template_id": "t1", "page_index": 0, "y": 10}],
|
|
}
|
|
client, _ = make_client(store=store)
|
|
body = client.get("/api/exam/templates/t1").json()
|
|
assert len(body["questions"]) == 1
|
|
assert len(body["response_areas"]) == 1
|
|
assert len(body["boundaries"]) == 1
|
|
|
|
|
|
def test_get_other_institute_template_is_404():
|
|
# RLS emulation: a template the caller can't see isn't in their visible store slice.
|
|
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_B, "teacher_id": OTHER_TEACHER}]}
|
|
client, _ = make_client(institute_ids=(INST_A,), store=store)
|
|
# The fake store doesn't model institute filtering on read, so simulate the RLS-hidden row
|
|
# by querying an id the caller's store doesn't contain.
|
|
assert client.get("/api/exam/templates/does-not-exist").status_code == 404
|
|
|
|
|
|
def test_put_replace_persists_children_with_client_ids():
|
|
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
|
|
client, store = make_client(store=store)
|
|
payload = {
|
|
"questions": [{"id": "q-uuid-1", "label": "01.1", "order": 0, "max_marks": 3}],
|
|
"response_areas": [{"id": "r-uuid-1", "question_id": "q-uuid-1", "page": 1, "bounds": {"x": 1}, "kind": "response"}],
|
|
"boundaries": [{"id": "b-uuid-1", "page_index": 0, "y": 12.5}],
|
|
}
|
|
resp = client.put("/api/exam/templates/t1", json=payload)
|
|
assert resp.status_code == 200
|
|
body = resp.json()
|
|
assert body["questions"][0]["id"] == "q-uuid-1" # client UUID preserved (Neo4j join key)
|
|
assert body["response_areas"][0]["id"] == "r-uuid-1"
|
|
assert body["boundaries"][0]["id"] == "b-uuid-1"
|
|
|
|
|
|
def test_put_replace_clears_previous_children():
|
|
store = {
|
|
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}],
|
|
"exam_questions": [{"id": "old", "template_id": "t1", "label": "stale", "order": 0}],
|
|
}
|
|
client, store = make_client(store=store)
|
|
client.put("/api/exam/templates/t1", json={"questions": [{"id": "new", "label": "fresh", "order": 0}]})
|
|
ids = {q["id"] for q in store["exam_questions"]}
|
|
assert ids == {"new"} # old row replaced, not appended
|
|
|
|
|
|
def test_put_replace_blocked_when_marks_recorded():
|
|
# Re-saving the structure after marking began would cascade-delete mark_entries → guard 409.
|
|
store = {
|
|
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}],
|
|
"marking_batches": [{"id": "b1", "template_id": "t1", "teacher_id": TEACHER, "institute_id": INST_A}],
|
|
"mark_entries": [{"id": "m1", "batch_id": "b1", "submission_id": "s1", "question_id": "q1", "awarded_marks": 2}],
|
|
"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "order": 0}],
|
|
}
|
|
client, store = make_client(store=store)
|
|
r = client.put("/api/exam/templates/t1", json={"questions": [{"id": "q2", "label": "new", "order": 0}]})
|
|
assert r.status_code == 409
|
|
# original question untouched (no destructive delete happened)
|
|
assert {q["id"] for q in store["exam_questions"]} == {"q1"}
|
|
|
|
|
|
def test_put_replace_allowed_when_batch_has_no_marks():
|
|
store = {
|
|
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}],
|
|
"marking_batches": [{"id": "b1", "template_id": "t1", "teacher_id": TEACHER, "institute_id": INST_A}],
|
|
"mark_entries": [],
|
|
}
|
|
client, _ = make_client(store=store)
|
|
assert client.put("/api/exam/templates/t1", json={"questions": [{"id": "q2", "label": "new", "order": 0}]}).status_code == 200
|
|
|
|
|
|
def test_put_replace_denied_for_non_owner():
|
|
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]}
|
|
# Caller is a colleague in the same institute (can read), but not the owner → 403.
|
|
client, _ = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store)
|
|
resp = client.put("/api/exam/templates/t1", json={"questions": []})
|
|
assert resp.status_code == 403
|
|
|
|
|
|
def test_archive_soft_deletes():
|
|
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
|
|
client, store = make_client(store=store)
|
|
resp = client.delete("/api/exam/templates/t1")
|
|
assert resp.status_code == 200
|
|
assert store["exam_templates"][0]["status"] == "archived" # not hard-deleted
|
|
|
|
|
|
def test_patch_question_updates_fields():
|
|
store = {"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01", "max_marks": 0}]}
|
|
client, store = make_client(store=store)
|
|
resp = client.patch("/api/exam/questions/q1", json={"max_marks": 5, "spec_ref": "8.1.2"})
|
|
assert resp.status_code == 200
|
|
assert resp.json()["max_marks"] == 5
|
|
assert store["exam_questions"][0]["spec_ref"] == "8.1.2"
|
|
|
|
|
|
def test_patch_question_missing_is_404():
|
|
client, _ = make_client(store={"exam_questions": []})
|
|
assert client.patch("/api/exam/questions/nope", json={"max_marks": 1}).status_code == 404
|
|
|
|
|
|
def test_patch_question_empty_body_is_400():
|
|
store = {"exam_questions": [{"id": "q1", "template_id": "t1", "label": "01"}]}
|
|
client, _ = make_client(store=store)
|
|
assert client.patch("/api/exam/questions/q1", json={}).status_code == 400
|
|
|
|
|
|
# ─── Neo4j projection (S4-7) ─────────────────────────────────────────────────
|
|
|
|
def test_put_schedules_projection(_stub_projection):
|
|
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
|
|
client, _ = make_client(store=store)
|
|
client.put("/api/exam/templates/t1", json={"questions": []})
|
|
assert _stub_projection == ["t1"] # projection enqueued for the saved template
|
|
|
|
|
|
def test_neo4j_sync_owner_runs():
|
|
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": TEACHER}]}
|
|
client, _ = make_client(store=store)
|
|
r = client.post("/api/exam/templates/t1/neo4j-sync")
|
|
assert r.status_code == 200
|
|
assert r.json()["projection"]["exam_code"] == "X"
|
|
|
|
|
|
def test_neo4j_sync_non_owner_403():
|
|
store = {"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": OTHER_TEACHER}]}
|
|
client, _ = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store)
|
|
assert client.post("/api/exam/templates/t1/neo4j-sync").status_code == 403
|
|
|
|
|
|
def test_neo4j_sync_404():
|
|
client, _ = make_client(store={"exam_templates": []})
|
|
assert client.post("/api/exam/templates/does-not-exist/neo4j-sync").status_code == 404
|