"""Marking batches, scans, marks, results & CSV (/api/exam/batches..., /api/exam/marks/...) — S4-6. As with templates, all user-facing access is as-the-user (RLS-enforced; E1). A batch is owned by the teacher who creates it (R2.4); colleagues in the same institute can read it (marking_batches_read), a teacher in another institute cannot (→ 404, IDOR-safe). Roster→cohort (R4.3/A7): creating a batch from a class materialises one student_submissions row per active enrollee (status='absent'), so every enrolled student is present in results/CSV from the start and a no-show is never silently dropped. The roster ids are read AS THE USER from class_students (cs_read requires the caller to teach/admin the class); only the display names are resolved via service role (profiles is deny-all as-user, E4 — see resolve_student_names). Scans (R2.3/E3): the upload endpoint enforces a max size and validates that the bytes are a PDF before storing. QR-decode + automatic student-matching is a follow-on (no QR'd fixtures exist until the PrintGenerator card); v1 supports explicit (manual) and ordered matching. """ from __future__ import annotations import csv import io import os import uuid from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile from fastapi.responses import Response from modules.database.supabase.utils.client import SupabaseServiceRoleClient from modules.database.supabase.utils.storage import StorageAdmin from modules.logger_tool import initialise_logger from routers.exam.dependencies import ExamContext, get_exam_context, resolve_student_names from routers.exam.schemas import CreateBatchRequest, MarkUpsertRequest logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) router = APIRouter() # E3: bound the upload so a 36-page scan batch can't exhaust memory / be a DoS vector. MAX_SCAN_BYTES = int(os.getenv("EXAM_SCAN_MAX_BYTES", str(50 * 1024 * 1024))) # 50 MB default SCANS_BUCKET = os.getenv("EXAM_SCANS_BUCKET", "cc.users") SCANS_PREFIX = "exam-submissions" # ─── helpers ───────────────────────────────────────────────────────────────── def _rows(result: Any) -> List[Dict[str, Any]]: data = getattr(result, "data", None) if not data: return [] return data if isinstance(data, list) else [data] def _first(result: Any) -> Optional[Dict[str, Any]]: rows = _rows(result) return rows[0] if rows else None def _fetch_batch_or_404(ctx: ExamContext, batch_id: str) -> Dict[str, Any]: row = _first(ctx.supabase.table("marking_batches").select("*").eq("id", batch_id).limit(1).execute()) if not row: raise HTTPException(status_code=404, detail="Batch not found") return row def _require_owner(ctx: ExamContext, batch: Dict[str, Any]) -> None: if batch.get("teacher_id") != ctx.user_id: raise HTTPException(status_code=403, detail="Only the batch owner can modify it") # ─── batches ───────────────────────────────────────────────────────────────── @router.post("/batches") async def create_batch( body: CreateBatchRequest, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: # The batch inherits the template's institute; reading the template as-user also proves the # caller may see it (RLS) — an unseeable template → 404. template = _first( ctx.supabase.table("exam_templates").select("id, institute_id").eq("id", body.template_id).limit(1).execute() ) if not template: raise HTTPException(status_code=404, detail="Template not found") batch_row = { "template_id": body.template_id, "class_id": body.class_id, "institute_id": template["institute_id"], "teacher_id": ctx.user_id, "title": body.title, "status": "open", } batch_row = {k: v for k, v in batch_row.items() if v is not None} batch = _first(ctx.supabase.table("marking_batches").insert(batch_row).execute()) if not batch: raise HTTPException(status_code=500, detail="Failed to create batch") batch_id = batch["id"] seeded = 0 if body.class_id: # Roster read is AS THE USER → cs_read requires the caller to teach/admin the class. roster = _rows( ctx.supabase.table("class_students") .select("student_id") .eq("class_id", body.class_id) .eq("status", "active") .execute() ) student_ids = [r["student_id"] for r in roster if r.get("student_id")] names = resolve_student_names(student_ids) if student_ids: sub_rows = [ { "batch_id": batch_id, "student_id": sid, "student_name": names.get(sid), "status": "absent", # A7: present in results until a scan is matched } for sid in student_ids ] ctx.supabase.table("student_submissions").insert(sub_rows).execute() seeded = len(sub_rows) logger.info(f"Marking batch {batch_id} created by {ctx.user_id}; {seeded} roster submissions seeded") return {**batch, "submission_count": seeded} @router.get("/batches") async def list_batches( include_archived: bool = False, template_id: Optional[str] = None, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: q = ctx.supabase.table("marking_batches").select("*") if template_id: q = q.eq("template_id", template_id) if not include_archived: q = q.neq("status", "archived") return {"batches": _rows(q.order("created_at", desc=True).execute())} @router.get("/batches/{batch_id}/queue") async def batch_queue( batch_id: str, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: batch = _fetch_batch_or_404(ctx, batch_id) submissions = _rows( ctx.supabase.table("student_submissions").select("*").eq("batch_id", batch_id).execute() ) marks = _rows(ctx.supabase.table("mark_entries").select("submission_id").eq("batch_id", batch_id).execute()) marked_counts: Dict[str, int] = {} for m in marks: sid = m.get("submission_id") marked_counts[sid] = marked_counts.get(sid, 0) + 1 enriched = [{**s, "mark_entry_count": marked_counts.get(s["id"], 0)} for s in submissions] progress = { "total": len(submissions), "absent": sum(1 for s in submissions if s.get("status") == "absent"), "complete": sum(1 for s in submissions if s.get("status") == "complete"), "in_progress": sum(1 for s in submissions if s.get("status") in ("matched", "marking")), } return {"batch": batch, "submissions": enriched, "progress": progress} # ─── results & CSV (A7) ────────────────────────────────────────────────────── def _assemble_results(ctx: ExamContext, batch: Dict[str, Any]) -> Dict[str, Any]: batch_id = batch["id"] questions = _rows( ctx.supabase.table("exam_questions") .select("id, label, max_marks, order") .eq("template_id", batch["template_id"]) .order("order") .execute() ) submissions = _rows( ctx.supabase.table("student_submissions").select("*").eq("batch_id", batch_id).execute() ) marks = _rows(ctx.supabase.table("mark_entries").select("*").eq("batch_id", batch_id).execute()) by_sub: Dict[str, Dict[str, float]] = {} for m in marks: by_sub.setdefault(m["submission_id"], {})[m["question_id"]] = m.get("awarded_marks") results = [] for s in submissions: # every submission incl. absent → A7 sub_marks = by_sub.get(s["id"], {}) # Blank total ONLY for a genuine no-show (absent AND nothing marked). A student with any # mark gets a real total regardless of status; a present-but-unmarked student totals 0. if sub_marks: total = sum(v or 0 for v in sub_marks.values()) elif s.get("status") == "absent": total = None else: total = 0 results.append({ "submission_id": s["id"], "student_id": s.get("student_id"), "student_name": s.get("student_name"), "status": s.get("status"), "marks": {qid: sub_marks.get(qid) for qid in (q["id"] for q in questions)}, "total": total, }) return {"batch": batch, "questions": questions, "results": results} @router.get("/batches/{batch_id}/results") async def batch_results( batch_id: str, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: batch = _fetch_batch_or_404(ctx, batch_id) return _assemble_results(ctx, batch) @router.get("/batches/{batch_id}/csv") async def batch_csv( batch_id: str, ctx: ExamContext = Depends(get_exam_context), ) -> Response: batch = _fetch_batch_or_404(ctx, batch_id) data = _assemble_results(ctx, batch) questions = data["questions"] buf = io.StringIO() writer = csv.writer(buf) writer.writerow(["student_name", "student_id", "status"] + [q["label"] for q in questions] + ["total"]) for r in data["results"]: # Absent students: blank marks + blank total, but the row is ALWAYS present (A7). cells = [ "" if r["marks"].get(q["id"]) is None else r["marks"].get(q["id"]) for q in questions ] total = "" if r["total"] is None else r["total"] writer.writerow([r.get("student_name") or "", r.get("student_id") or "", r.get("status")] + cells + [total]) return Response( content=buf.getvalue(), media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="batch-{batch_id}.csv"'}, ) # ─── marks ─────────────────────────────────────────────────────────────────── @router.put("/marks/{mark_id}") async def upsert_mark( mark_id: str, body: MarkUpsertRequest, ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: # Derive batch_id from the submission (as-user read → also enforces the caller owns the batch # the submission belongs to). The client never supplies the RLS scoping key directly. submission = _first( ctx.supabase.table("student_submissions").select("id, batch_id, status").eq("id", body.submission_id).limit(1).execute() ) if not submission: raise HTTPException(status_code=404, detail="Submission not found") row = { "id": mark_id, "submission_id": body.submission_id, "question_id": body.question_id, "batch_id": submission["batch_id"], "awarded_marks": body.awarded_marks, "marked_by": "teacher", } if body.mark_scheme_detail is not None: row["mark_scheme_detail"] = body.mark_scheme_detail if body.annotation_shape_ids is not None: row["annotation_shape_ids"] = body.annotation_shape_ids if body.comment is not None: row["comment"] = body.comment if body.confirmed is not None: row["confirmed"] = body.confirmed upserted = _first(ctx.supabase.table("mark_entries").upsert(row).execute()) if not upserted: raise HTTPException(status_code=500, detail="Failed to upsert mark") # A marked student is, by definition, not absent — advance the submission out of the # no-submission states so results/queue reflect that marking has started. if submission.get("status") in ("absent", "unmatched"): ctx.supabase.table("student_submissions").update({"status": "marking"}).eq("id", body.submission_id).execute() return upserted # ─── scans (R2.3 / E3) ─────────────────────────────────────────────────────── @router.post("/batches/{batch_id}/scans") async def upload_scan( batch_id: str, file: UploadFile = File(...), student_id: Optional[str] = Form(default=None), matching_method: str = Form(default="manual"), ctx: ExamContext = Depends(get_exam_context), ) -> Dict[str, Any]: batch = _fetch_batch_or_404(ctx, batch_id) _require_owner(ctx, batch) # E3: validate MIME (client-declared) before reading the body. if (file.content_type or "").lower() not in ("application/pdf", "application/x-pdf"): raise HTTPException(status_code=415, detail="Only application/pdf scans are accepted") # E3: read with a hard size ceiling instead of buffering an unbounded upload. chunks: List[bytes] = [] total = 0 while True: chunk = await file.read(1024 * 1024) if not chunk: break total += len(chunk) if total > MAX_SCAN_BYTES: raise HTTPException(status_code=413, detail=f"Scan exceeds max size ({MAX_SCAN_BYTES} bytes)") chunks.append(chunk) data = b"".join(chunks) # E3: content-sniff — declared type can be spoofed; require the PDF magic header. if not data.startswith(b"%PDF-"): raise HTTPException(status_code=415, detail="Uploaded file is not a valid PDF") # Store via service role (documented): no submissions-bucket storage RLS exists yet; the # endpoint already authorised the caller as the batch owner above. storage_path = f"{SCANS_PREFIX}/{batch_id}/{uuid.uuid4()}.pdf" try: StorageAdmin().upload_file(SCANS_BUCKET, storage_path, data, "application/pdf", upsert=True) except Exception as exc: logger.error(f"scan storage upload failed (batch={batch_id}): {exc}") raise HTTPException(status_code=502, detail="Failed to store scan") sb = ctx.supabase submission: Optional[Dict[str, Any]] = None if matching_method == "manual" and student_id: submission = _first( sb.table("student_submissions").select("*").eq("batch_id", batch_id).eq("student_id", student_id).limit(1).execute() ) elif matching_method == "ordered": # Assign to the next not-yet-submitted roster slot. pending = _rows( sb.table("student_submissions").select("*").eq("batch_id", batch_id).in_("status", ["absent", "unmatched"]).execute() ) submission = pending[0] if pending else None payload = { "scan_url": storage_path, "qr_code": None, "matching_method": matching_method if (student_id or matching_method == "ordered") else "manual", "page_count": None, "status": "matched" if submission else "unmatched", } if submission: updated = _first(sb.table("student_submissions").update(payload).eq("id", submission["id"]).execute()) return updated or submission # No roster slot matched → create an unmatched submission to be reconciled later. new_row = {"batch_id": batch_id, **payload} created = _first(sb.table("student_submissions").insert(new_row).execute()) if not created: raise HTTPException(status_code=500, detail="Failed to record scan submission") return created