api/routers/exam/batches.py
CC Worker 5ad9c01cde feat(exam): batches, scans, marks, results, CSV (S4-6)
Adds routers/exam/batches.py (mounted alongside templates under /api/exam):
- POST/GET /batches — batch creation seeds the cohort from class_students AS
  THE USER (cs_read requires caller teaches/admins the class); each active
  enrollee becomes a student_submissions row (status='absent') so no student
  is ever dropped from results (A7). Display names denormalised via a
  documented service-role profiles read (deny-all as-user, E4).
- GET /batches/{id}/queue — submissions + per-submission mark counts + progress.
- GET /batches/{id}/results + /csv — every roster student incl. absent (blank
  marks/total); CSV row always present (A7 baked into the contract).
- PUT /marks/{id} — upsert; batch_id derived server-side from the submission
  (client never supplies the RLS scoping key).
- POST /batches/{id}/scans — E3 guards: MIME check, hard size ceiling (chunked
  read), %PDF magic-byte sniff; owner-only; stores via service-role storage;
  manual/ordered matching (QR-decode is a follow-on, no QR fixtures yet).

Unit tests cover batch/roster-seed/list, queue, results+CSV A7, mark upsert
round-trip, and all scan guards + owner check.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 18:40:10 +00:00

351 lines
15 KiB
Python

"""Marking batches, scans, marks, results & CSV (/api/exam/batches..., /api/exam/marks/...) — S4-6.
As with templates, all user-facing access is as-the-user (RLS-enforced; E1). A batch is owned by
the teacher who creates it (R2.4); colleagues in the same institute can read it
(marking_batches_read), a teacher in another institute cannot (→ 404, IDOR-safe).
Roster→cohort (R4.3/A7): creating a batch from a class materialises one student_submissions row
per active enrollee (status='absent'), so every enrolled student is present in results/CSV from
the start and a no-show is never silently dropped. The roster ids are read AS THE USER from
class_students (cs_read requires the caller to teach/admin the class); only the display names are
resolved via service role (profiles is deny-all as-user, E4 — see resolve_student_names).
Scans (R2.3/E3): the upload endpoint enforces a max size and validates that the bytes are a PDF
before storing. QR-decode + automatic student-matching is a follow-on (no QR'd fixtures exist
until the PrintGenerator card); v1 supports explicit (manual) and ordered matching.
"""
from __future__ import annotations
import csv
import io
import os
import uuid
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
from fastapi.responses import Response
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin
from modules.logger_tool import initialise_logger
from routers.exam.dependencies import ExamContext, get_exam_context, resolve_student_names
from routers.exam.schemas import CreateBatchRequest, MarkUpsertRequest
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
router = APIRouter()
# E3: bound the upload so a 36-page scan batch can't exhaust memory / be a DoS vector.
MAX_SCAN_BYTES = int(os.getenv("EXAM_SCAN_MAX_BYTES", str(50 * 1024 * 1024))) # 50 MB default
SCANS_BUCKET = os.getenv("EXAM_SCANS_BUCKET", "cc.users")
SCANS_PREFIX = "exam-submissions"
# ─── helpers ─────────────────────────────────────────────────────────────────
def _rows(result: Any) -> List[Dict[str, Any]]:
data = getattr(result, "data", None)
if not data:
return []
return data if isinstance(data, list) else [data]
def _first(result: Any) -> Optional[Dict[str, Any]]:
rows = _rows(result)
return rows[0] if rows else None
def _fetch_batch_or_404(ctx: ExamContext, batch_id: str) -> Dict[str, Any]:
row = _first(ctx.supabase.table("marking_batches").select("*").eq("id", batch_id).limit(1).execute())
if not row:
raise HTTPException(status_code=404, detail="Batch not found")
return row
def _require_owner(ctx: ExamContext, batch: Dict[str, Any]) -> None:
if batch.get("teacher_id") != ctx.user_id:
raise HTTPException(status_code=403, detail="Only the batch owner can modify it")
# ─── batches ─────────────────────────────────────────────────────────────────
@router.post("/batches")
async def create_batch(
body: CreateBatchRequest,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
# The batch inherits the template's institute; reading the template as-user also proves the
# caller may see it (RLS) — an unseeable template → 404.
template = _first(
ctx.supabase.table("exam_templates").select("id, institute_id").eq("id", body.template_id).limit(1).execute()
)
if not template:
raise HTTPException(status_code=404, detail="Template not found")
batch_row = {
"template_id": body.template_id,
"class_id": body.class_id,
"institute_id": template["institute_id"],
"teacher_id": ctx.user_id,
"title": body.title,
"status": "open",
}
batch_row = {k: v for k, v in batch_row.items() if v is not None}
batch = _first(ctx.supabase.table("marking_batches").insert(batch_row).execute())
if not batch:
raise HTTPException(status_code=500, detail="Failed to create batch")
batch_id = batch["id"]
seeded = 0
if body.class_id:
# Roster read is AS THE USER → cs_read requires the caller to teach/admin the class.
roster = _rows(
ctx.supabase.table("class_students")
.select("student_id")
.eq("class_id", body.class_id)
.eq("status", "active")
.execute()
)
student_ids = [r["student_id"] for r in roster if r.get("student_id")]
names = resolve_student_names(student_ids)
if student_ids:
sub_rows = [
{
"batch_id": batch_id,
"student_id": sid,
"student_name": names.get(sid),
"status": "absent", # A7: present in results until a scan is matched
}
for sid in student_ids
]
ctx.supabase.table("student_submissions").insert(sub_rows).execute()
seeded = len(sub_rows)
logger.info(f"Marking batch {batch_id} created by {ctx.user_id}; {seeded} roster submissions seeded")
return {**batch, "submission_count": seeded}
@router.get("/batches")
async def list_batches(
include_archived: bool = False,
template_id: Optional[str] = None,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
q = ctx.supabase.table("marking_batches").select("*")
if template_id:
q = q.eq("template_id", template_id)
if not include_archived:
q = q.neq("status", "archived")
return {"batches": _rows(q.order("created_at", desc=True).execute())}
@router.get("/batches/{batch_id}/queue")
async def batch_queue(
batch_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
batch = _fetch_batch_or_404(ctx, batch_id)
submissions = _rows(
ctx.supabase.table("student_submissions").select("*").eq("batch_id", batch_id).execute()
)
marks = _rows(ctx.supabase.table("mark_entries").select("submission_id").eq("batch_id", batch_id).execute())
marked_counts: Dict[str, int] = {}
for m in marks:
sid = m.get("submission_id")
marked_counts[sid] = marked_counts.get(sid, 0) + 1
enriched = [{**s, "mark_entry_count": marked_counts.get(s["id"], 0)} for s in submissions]
progress = {
"total": len(submissions),
"absent": sum(1 for s in submissions if s.get("status") == "absent"),
"complete": sum(1 for s in submissions if s.get("status") == "complete"),
"in_progress": sum(1 for s in submissions if s.get("status") in ("matched", "marking")),
}
return {"batch": batch, "submissions": enriched, "progress": progress}
# ─── results & CSV (A7) ──────────────────────────────────────────────────────
def _assemble_results(ctx: ExamContext, batch: Dict[str, Any]) -> Dict[str, Any]:
batch_id = batch["id"]
questions = _rows(
ctx.supabase.table("exam_questions")
.select("id, label, max_marks, order")
.eq("template_id", batch["template_id"])
.order("order")
.execute()
)
submissions = _rows(
ctx.supabase.table("student_submissions").select("*").eq("batch_id", batch_id).execute()
)
marks = _rows(ctx.supabase.table("mark_entries").select("*").eq("batch_id", batch_id).execute())
by_sub: Dict[str, Dict[str, float]] = {}
for m in marks:
by_sub.setdefault(m["submission_id"], {})[m["question_id"]] = m.get("awarded_marks")
results = []
for s in submissions: # every submission incl. absent → A7
sub_marks = by_sub.get(s["id"], {})
is_absent = s.get("status") == "absent"
total = None if is_absent else sum(v or 0 for v in sub_marks.values())
results.append({
"submission_id": s["id"],
"student_id": s.get("student_id"),
"student_name": s.get("student_name"),
"status": s.get("status"),
"marks": {qid: sub_marks.get(qid) for qid in (q["id"] for q in questions)},
"total": total,
})
return {"batch": batch, "questions": questions, "results": results}
@router.get("/batches/{batch_id}/results")
async def batch_results(
batch_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
batch = _fetch_batch_or_404(ctx, batch_id)
return _assemble_results(ctx, batch)
@router.get("/batches/{batch_id}/csv")
async def batch_csv(
batch_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Response:
batch = _fetch_batch_or_404(ctx, batch_id)
data = _assemble_results(ctx, batch)
questions = data["questions"]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(["student_name", "student_id", "status"] + [q["label"] for q in questions] + ["total"])
for r in data["results"]:
# Absent students: blank marks + blank total, but the row is ALWAYS present (A7).
cells = [
"" if r["marks"].get(q["id"]) is None else r["marks"].get(q["id"])
for q in questions
]
total = "" if r["total"] is None else r["total"]
writer.writerow([r.get("student_name") or "", r.get("student_id") or "", r.get("status")] + cells + [total])
return Response(
content=buf.getvalue(),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="batch-{batch_id}.csv"'},
)
# ─── marks ───────────────────────────────────────────────────────────────────
@router.put("/marks/{mark_id}")
async def upsert_mark(
mark_id: str,
body: MarkUpsertRequest,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
# Derive batch_id from the submission (as-user read → also enforces the caller owns the batch
# the submission belongs to). The client never supplies the RLS scoping key directly.
submission = _first(
ctx.supabase.table("student_submissions").select("id, batch_id").eq("id", body.submission_id).limit(1).execute()
)
if not submission:
raise HTTPException(status_code=404, detail="Submission not found")
row = {
"id": mark_id,
"submission_id": body.submission_id,
"question_id": body.question_id,
"batch_id": submission["batch_id"],
"awarded_marks": body.awarded_marks,
"marked_by": "teacher",
}
if body.mark_scheme_detail is not None:
row["mark_scheme_detail"] = body.mark_scheme_detail
if body.annotation_shape_ids is not None:
row["annotation_shape_ids"] = body.annotation_shape_ids
if body.comment is not None:
row["comment"] = body.comment
if body.confirmed is not None:
row["confirmed"] = body.confirmed
upserted = _first(ctx.supabase.table("mark_entries").upsert(row).execute())
if not upserted:
raise HTTPException(status_code=500, detail="Failed to upsert mark")
return upserted
# ─── scans (R2.3 / E3) ───────────────────────────────────────────────────────
@router.post("/batches/{batch_id}/scans")
async def upload_scan(
batch_id: str,
file: UploadFile = File(...),
student_id: Optional[str] = Form(default=None),
matching_method: str = Form(default="manual"),
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
batch = _fetch_batch_or_404(ctx, batch_id)
_require_owner(ctx, batch)
# E3: validate MIME (client-declared) before reading the body.
if (file.content_type or "").lower() not in ("application/pdf", "application/x-pdf"):
raise HTTPException(status_code=415, detail="Only application/pdf scans are accepted")
# E3: read with a hard size ceiling instead of buffering an unbounded upload.
chunks: List[bytes] = []
total = 0
while True:
chunk = await file.read(1024 * 1024)
if not chunk:
break
total += len(chunk)
if total > MAX_SCAN_BYTES:
raise HTTPException(status_code=413, detail=f"Scan exceeds max size ({MAX_SCAN_BYTES} bytes)")
chunks.append(chunk)
data = b"".join(chunks)
# E3: content-sniff — declared type can be spoofed; require the PDF magic header.
if not data.startswith(b"%PDF-"):
raise HTTPException(status_code=415, detail="Uploaded file is not a valid PDF")
# Store via service role (documented): no submissions-bucket storage RLS exists yet; the
# endpoint already authorised the caller as the batch owner above.
storage_path = f"{SCANS_PREFIX}/{batch_id}/{uuid.uuid4()}.pdf"
try:
StorageAdmin().upload_file(SCANS_BUCKET, storage_path, data, "application/pdf", upsert=True)
except Exception as exc:
logger.error(f"scan storage upload failed (batch={batch_id}): {exc}")
raise HTTPException(status_code=502, detail="Failed to store scan")
sb = ctx.supabase
submission: Optional[Dict[str, Any]] = None
if matching_method == "manual" and student_id:
submission = _first(
sb.table("student_submissions").select("*").eq("batch_id", batch_id).eq("student_id", student_id).limit(1).execute()
)
elif matching_method == "ordered":
# Assign to the next not-yet-submitted roster slot.
pending = _rows(
sb.table("student_submissions").select("*").eq("batch_id", batch_id).in_("status", ["absent", "unmatched"]).execute()
)
submission = pending[0] if pending else None
payload = {
"scan_url": storage_path,
"qr_code": None,
"matching_method": matching_method if (student_id or matching_method == "ordered") else "manual",
"page_count": None,
"status": "matched" if submission else "unmatched",
}
if submission:
updated = _first(sb.table("student_submissions").update(payload).eq("id", submission["id"]).execute())
return updated or submission
# No roster slot matched → create an unmatched submission to be reconciled later.
new_row = {"batch_id": batch_id, **payload}
created = _first(sb.table("student_submissions").insert(new_row).execute())
if not created:
raise HTTPException(status_code=500, detail="Failed to record scan submission")
return created