From 2678d0be42b855ca19475bdf3e936eaf040b5605 Mon Sep 17 00:00:00 2001 From: kcar Date: Sun, 7 Jun 2026 20:48:08 +0100 Subject: [PATCH] [verified] add exam template auto-map endpoint --- routers/exam/templates.py | 436 +++++++++++++++++++++++++++++++---- tests/test_exam_templates.py | 141 +++++++++++ 2 files changed, 537 insertions(+), 40 deletions(-) diff --git a/routers/exam/templates.py b/routers/exam/templates.py index 71c2923..ab93f2f 100644 --- a/routers/exam/templates.py +++ b/routers/exam/templates.py @@ -12,13 +12,19 @@ join keys (spec §2). """ from __future__ import annotations +import json import os +import tempfile +import time import uuid from typing import Any, Dict, List, Optional, Tuple from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile -from fastapi.responses import Response +from fastapi.responses import JSONResponse, Response +from api.services.docling import AutoMapError, auto_map +from api.services.docling import extract as docling_extract +from api.services.docling.regions import detect_response_regions_from_pdf from modules.database.services.exam_projection import project_template, project_template_safe from modules.database.supabase.utils.client import SupabaseServiceRoleClient from modules.database.supabase.utils.storage import StorageAdmin @@ -37,6 +43,8 @@ router = APIRouter() SOURCE_CABINET_NAME = "Exam Marker Template Sources" SOURCE_BUCKET_FALLBACK = "cc.users" +AUTO_MAP_JOB_PREFIX = "exam:auto-map" +_AUTO_MAP_JOB_STATUS: Dict[str, Dict[str, Any]] = {} # ─── helpers ───────────────────────────────────────────────────────────────── @@ -224,6 +232,341 @@ async def _upload_template_source_file( return file_id +def _job_key(job_id: str) -> str: + return f"{AUTO_MAP_JOB_PREFIX}:{job_id}" + + +def _redis_client() -> Any: + try: + import redis + except Exception: + return None + try: + url = os.getenv("LOCAL_REDIS_URL") or os.getenv("REDIS_URL") + if url: + client = redis.Redis.from_url(url, decode_responses=True, socket_timeout=2) + else: + client = redis.Redis( + host=os.getenv("REDIS_HOST", "localhost"), + port=int(os.getenv("REDIS_PORT", "6379")), + db=int(os.getenv("REDIS_DB_DEV", os.getenv("REDIS_DB", "0"))), + password=os.getenv("REDIS_PASSWORD") or None, + decode_responses=True, + socket_timeout=2, + ) + client.ping() + return client + except Exception: + return None + + +def _set_auto_map_status(job_id: str, payload: Dict[str, Any]) -> None: + status = {"job_id": job_id, "updated_at": int(time.time()), **payload} + _AUTO_MAP_JOB_STATUS[job_id] = status + client = _redis_client() + if client is not None: + try: + client.setex(_job_key(job_id), int(os.getenv("EXAM_AUTO_MAP_JOB_TTL", "3600")), json.dumps(status)) + except Exception as exc: + logger.warning(f"auto-map redis status write failed for {job_id}: {exc}") + + +def _get_auto_map_status(job_id: str) -> Optional[Dict[str, Any]]: + client = _redis_client() + if client is not None: + try: + raw = client.get(_job_key(job_id)) + if raw: + return json.loads(raw) + except Exception as exc: + logger.warning(f"auto-map redis status read failed for {job_id}: {exc}") + return _AUTO_MAP_JOB_STATUS.get(job_id) + + +def _resolve_template_source(ctx: ExamContext, template: Dict[str, Any]) -> Tuple[str, str, bytes]: + bucket: Optional[str] = None + path: Optional[str] = None + if template.get("exam_id"): + storage_loc = _lookup_exam_storage_loc(template["exam_id"]) + if not storage_loc: + raise HTTPException(status_code=404, detail="Template source not found") + try: + bucket, path = _parse_storage_loc(storage_loc) + except ValueError: + raise HTTPException(status_code=404, detail="Template source not found") + elif template.get("source_file_id"): + # Same scoped service-role exception as source-pdf: owner gate has already passed. + file_row = _first( + SupabaseServiceRoleClient().supabase.table("files") + .select("bucket, path, mime_type, name") + .eq("id", template["source_file_id"]) + .limit(1) + .execute() + ) + if not file_row or not file_row.get("bucket") or not file_row.get("path"): + raise HTTPException(status_code=404, detail="Template source not found") + bucket = file_row["bucket"] + path = file_row["path"] + else: + raise HTTPException(status_code=404, detail="Template source not found") + try: + return bucket, path, StorageAdmin().download_file(bucket, path) + except Exception as exc: + logger.warning(f"Template source download failed for template {template.get('id')}: {exc}") + raise HTTPException(status_code=404, detail="Template source not found") + + +def _pdf_has_text_layer(pdf_bytes: bytes) -> bool: + with tempfile.NamedTemporaryFile(prefix="cc-auto-map-detect-", suffix=".pdf", delete=False) as fh: + fh.write(pdf_bytes) + tmp = fh.name + try: + return bool(docling_extract.has_text_layer(tmp)) + finally: + try: + os.unlink(tmp) + except OSError: + pass + + +def _pdf_page_geometry(pdf_bytes: bytes) -> List[Dict[str, float]]: + with tempfile.NamedTemporaryFile(prefix="cc-auto-map-geom-", suffix=".pdf", delete=False) as fh: + fh.write(pdf_bytes) + tmp = fh.name + try: + import fitz + doc = fitz.open(tmp) + pages: List[Dict[str, float]] = [] + page_top = 0.0 + try: + for page in doc: + media = page.mediabox + crop = page.cropbox + rendered_w = float(crop.width or page.rect.width or 595.0) + rendered_h = float(crop.height or page.rect.height or 842.0) + pages.append({ + "media_x0": float(media.x0), + "crop_x0": float(crop.x0), + "crop_y0": float(crop.y0), + "page_pt_w": float(crop.width or page.rect.width or 1), + "page_pt_h": float(crop.height or page.rect.height or 1), + "rendered_w": rendered_w, + "rendered_h": rendered_h, + "page_top": page_top, + }) + page_top += rendered_h + finally: + doc.close() + return pages + except Exception as exc: + logger.warning(f"PDF geometry read failed; falling back to A4 page geometry: {exc}") + return [] + finally: + try: + os.unlink(tmp) + except OSError: + pass + + +def _page_geom(pages: List[Dict[str, float]], page_number: int) -> Dict[str, float]: + if 1 <= page_number <= len(pages): + return pages[page_number - 1] + return { + "media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0, + "page_pt_w": 595.0, "page_pt_h": 842.0, + "rendered_w": 595.0, "rendered_h": 842.0, + "page_top": (page_number - 1) * 842.0, + } + + +def _box_to_canvas(box: Optional[Dict[str, Any]], page_number: int, pages: List[Dict[str, float]]) -> Optional[Dict[str, float]]: + if not box: + return None + g = _page_geom(pages, page_number) + if box.get("coord_origin") == "TOPLEFT" and {"x", "y", "w", "h"}.issubset(box): + scale = 0.5 if box.get("unit") == "px" else 1.0 + return { + "x": round(float(box["x"]) * scale, 2), + "y": round(g["page_top"] + float(box["y"]) * scale, 2), + "w": round(float(box["w"]) * scale, 2), + "h": round(float(box["h"]) * scale, 2), + } + if not {"l", "t", "r", "b"}.issubset(box): + return None + l, t, r, b = (float(box[k]) for k in ("l", "t", "r", "b")) + # Canvas pages are rendered from the PDF CropBox with page_left fixed at 0. + # Docling boxes are in PDF user-space coordinates, so subtract the CropBox + # origin instead of adding it; otherwise cropped PDFs shift right/down. + x = (l - g["crop_x0"]) / g["page_pt_w"] * g["rendered_w"] + y = g["page_top"] + (g["page_pt_h"] - (t - g["crop_y0"])) / g["page_pt_h"] * g["rendered_h"] + w = (r - l) / g["page_pt_w"] * g["rendered_w"] + h = (t - b) / g["page_pt_h"] * g["rendered_h"] + return {"x": round(x, 2), "y": round(y, 2), "w": round(w, 2), "h": round(h, 2)} + + +def _response_form_from_region_type(region_type: Any) -> Optional[str]: + return { + "answer_lines": "lines", + "answer_box": "answer-box", + "working_space": "working", + "lines": "lines", + "answer-box": "answer-box", + "working": "working", + }.get(str(region_type or "")) + + +def _y_to_canvas(y_value: float, page_number: int, pages: List[Dict[str, float]]) -> float: + g = _page_geom(pages, page_number) + return round(g["page_top"] + (g["page_pt_h"] - (float(y_value) - g["crop_y0"])) / g["page_pt_h"] * g["rendered_h"], 2) + + +def _ai_id(template_id: str, *parts: Any) -> str: + return str(uuid.uuid5(uuid.NAMESPACE_URL, "/".join(["cc-auto-map", template_id, *[str(p) for p in parts]]))) + + +def _safe_confidence(value: Any = None) -> float: + if isinstance(value, (int, float)): + return max(0.0, min(1.0, float(value))) + return 0.75 + + +def _margin_values(first_pass: Dict[str, Any], page_number: int) -> Dict[str, Optional[float]]: + vals: Dict[str, Optional[float]] = {"left": None, "right": None, "top": None, "bottom": None} + for m in first_pass.get("margins") or []: + edge = m.get("edge") + if edge not in vals: + continue + if m.get("scope") == "document" and edge in {"left", "right"}: + vals[edge] = m.get("value") + elif m.get("scope") == "page" and int(m.get("page") or -1) == page_number: + vals[edge] = m.get("value") + return vals + + +def _map_first_pass_to_rows(template_id: str, first_pass: Dict[str, Any], pdf_bytes: bytes, extra_regions: Optional[List[Dict[str, Any]]] = None) -> Dict[str, List[Dict[str, Any]]]: + pages_geom = _pdf_page_geometry(pdf_bytes) + questions: List[Dict[str, Any]] = [] + response_areas: List[Dict[str, Any]] = [] + boundaries: List[Dict[str, Any]] = [] + layout: List[Dict[str, Any]] = [] + q_ids: Dict[str, str] = {} + first_part_by_page: Dict[int, str] = {} + pages_obj = first_pass.get("pages") or {} + + for page_key in sorted(pages_obj, key=lambda k: int(k)): + page_number = int(page_key) + page_index = page_number - 1 + page = pages_obj[page_key] + margins = _margin_values(first_pass, page_number) + layout.append({ + "id": _ai_id(template_id, "layout", page_number), + "template_id": template_id, + "page_index": page_index, + "role": page.get("role"), + "margin_left": margins["left"], + "margin_right": margins["right"], + "margin_top": margins["top"], + "margin_bottom": margins["bottom"], + "margins_enabled": bool(page.get("margins_enabled", True)), + "source": "ai", + "confirmed": False, + "confidence": 0.8, + "derivation": "docling-page-layout", + "meta": {"role_source": page.get("role_source"), "schema": first_pass.get("meta", {}).get("schema")}, + }) + for band in page.get("main_bands") or []: + label = str(band.get("question") or "").strip() + if not label: + continue + qid = q_ids.setdefault(label, _ai_id(template_id, "question", label)) + if not any(q["id"] == qid for q in questions): + questions.append({"id": qid, "template_id": template_id, "label": label, "order": len(q_ids) - 1, "max_marks": 0, "is_container": True, "source": "ai", "confirmed": False, "confidence": _safe_confidence(band.get("confidence")), "derivation": "docling-main-band"}) + for edge, yv in (("start", band.get("y_start")), ("end", band.get("y_end"))): + if yv is not None: + boundaries.append({"id": _ai_id(template_id, "boundary", label, edge, page_number), "template_id": template_id, "question_id": qid, "label": f"{label}:{edge}", "page_index": page_index, "y": _y_to_canvas(float(yv), page_number, pages_geom), "bounds": None, "source": "ai", "confirmed": False, "confidence": _safe_confidence(band.get("confidence")), "derivation": "docling-main-band"}) + for band in page.get("part_bands") or []: + label = str(band.get("label") or "").strip() + parent_label = str(band.get("question") or "").strip() + if not label: + continue + parent_id = q_ids.setdefault(parent_label, _ai_id(template_id, "question", parent_label or label.split(".")[0])) + if parent_label and not any(q["id"] == parent_id for q in questions): + questions.append({"id": parent_id, "template_id": template_id, "label": parent_label, "order": len(q_ids) - 1, "max_marks": 0, "is_container": True, "source": "ai", "confirmed": False, "confidence": 0.7, "derivation": "docling-inferred-main-question"}) + pid = _ai_id(template_id, "part", label) + first_part_by_page.setdefault(page_index, pid) + bounds = None + y1, y2 = band.get("y_start"), band.get("y_end") + if margins["left"] is not None and margins["right"] is not None and y1 is not None and y2 is not None: + top = max(float(y1), float(y2)); bottom = min(float(y1), float(y2)) + bounds = _box_to_canvas({"l": margins["left"], "r": margins["right"], "t": top, "b": bottom, "coord_origin": "BOTTOMLEFT"}, page_number, pages_geom) + bounds = bounds or _box_to_canvas(band.get("label_box"), page_number, pages_geom) + questions.append({"id": pid, "template_id": template_id, "parent_id": parent_id, "label": label, "order": len(questions), "max_marks": 0, "is_container": False, "bounds": bounds, "page": page_number, "source": "ai", "confirmed": False, "confidence": _safe_confidence(band.get("confidence")), "derivation": "docling-part-band-x-margins"}) + + default_qid = questions[0]["id"] if questions else _ai_id(template_id, "question", "auto") + for page_key in sorted(pages_obj, key=lambda k: int(k)): + page_number = int(page_key); page_index = page_number - 1; page = pages_obj[page_key] + owner_qid = first_part_by_page.get(page_index, default_qid) + for collection, kind, context_type, derivation in (("furniture", "furniture", None, "docling-furniture"), ("figures", "context", "figure", "docling-context-figure"), ("tables", "context", "data_table", "docling-table")): + for idx, item in enumerate(page.get(collection) or []): + bounds = _box_to_canvas(item.get("box"), page_number, pages_geom) + if bounds: + row = {"id": _ai_id(template_id, collection, page_number, idx), "template_id": template_id, "question_id": owner_qid, "page": page_number, "bounds": bounds, "kind": kind, "source": "ai", "confirmed": False, "confidence": 0.65, "derivation": derivation} + if context_type: + row["context_type"] = context_type + response_areas.append(row) + for idx, region in enumerate(extra_regions or []): + page_index = int(region.get("page_index", 0)) + bounds = _box_to_canvas(region.get("bbox") or {}, page_index + 1, pages_geom) + if bounds: + response_form = _response_form_from_region_type(region.get("region_type")) + if response_form: + response_areas.append({"id": _ai_id(template_id, "region", page_index, idx), "template_id": template_id, "question_id": first_part_by_page.get(page_index, default_qid), "page": page_index + 1, "bounds": bounds, "kind": "response", "response_form": response_form, "source": "ai", "confirmed": False, "confidence": _safe_confidence(region.get("confidence")), "derivation": region.get("detection_method") or "opencv-response-region"}) + return {"questions": questions, "response_areas": response_areas, "boundaries": boundaries, "layout": layout} + + +def _refresh_ai_rows(ctx: ExamContext, template_id: str, rows: Dict[str, List[Dict[str, Any]]]) -> None: + sb = ctx.supabase + for table in ("exam_response_areas", "exam_boundaries", "exam_template_layout", "exam_questions"): + sb.table(table).delete().eq("template_id", template_id).eq("source", "ai").eq("confirmed", False).execute() + for table, key in (("exam_questions", "questions"), ("exam_response_areas", "response_areas"), ("exam_boundaries", "boundaries"), ("exam_template_layout", "layout")): + payload = rows.get(key) or [] + if payload: + sb.table(table).insert(payload).execute() + + +def _run_auto_map_merge(ctx: ExamContext, template_id: str, pdf_bytes: bytes, source_label: str) -> Dict[str, List[Dict[str, Any]]]: + first_pass = auto_map(pdf_bytes, source_pdf=source_label) + extra_regions: List[Dict[str, Any]] = [] + try: + with tempfile.NamedTemporaryFile(prefix="cc-auto-map-regions-", suffix=".pdf", delete=False) as fh: + fh.write(pdf_bytes) + tmp = fh.name + try: + extra_regions = detect_response_regions_from_pdf(tmp) + finally: + try: + os.unlink(tmp) + except OSError: + pass + except Exception as exc: + logger.info(f"auto-map response-region detection skipped for template {template_id}: {exc}") + rows = _map_first_pass_to_rows(template_id, first_pass, pdf_bytes, extra_regions) + _refresh_ai_rows(ctx, template_id, rows) + updates = {"exam_code": first_pass.get("meta", {}).get("paper_code"), "page_count": first_pass.get("meta", {}).get("n_pages")} + ctx.supabase.table("exam_templates").update({k: v for k, v in updates.items() if v is not None}).eq("id", template_id).execute() + return rows + + +def _run_auto_map_job(job_id: str, ctx: ExamContext, template_id: str, pdf_bytes: bytes, source_label: str) -> None: + _set_auto_map_status(job_id, {"status": "running", "template_id": template_id}) + try: + rows = _run_auto_map_merge(ctx, template_id, pdf_bytes, source_label) + _set_auto_map_status(job_id, {"status": "completed", "template_id": template_id, "counts": {k: len(v) for k, v in rows.items()}}) + except Exception as exc: + logger.exception(f"auto-map job failed for template {template_id}: {exc}") + _set_auto_map_status(job_id, {"status": "failed", "template_id": template_id, "error": str(exc)}) + + # ─── templates ─────────────────────────────────────────────────────────────── @@ -339,48 +682,61 @@ async def get_template_source_pdf( template = _fetch_template_or_404(ctx, template_id) _require_source_visibility_or_404(ctx, template) - bucket: Optional[str] = None - path: Optional[str] = None - - if template.get("exam_id"): - storage_loc = _lookup_exam_storage_loc(template["exam_id"]) - if not storage_loc: - raise HTTPException(status_code=404, detail="Template source not found") - try: - bucket, path = _parse_storage_loc(storage_loc) - except ValueError: - raise HTTPException(status_code=404, detail="Template source not found") - elif template.get("source_file_id"): - # Resolve the file row via service role (authz already done above: the caller proved they - # can see this template, and source_file_id is the template's own file). Reading `files` - # as-the-user trips a pre-existing broken RLS policy on cabinet_memberships - # (42P17 infinite recursion) — documented service-role exception, like the catalogue lookup. - file_row = _first( - SupabaseServiceRoleClient().supabase.table("files") - .select("bucket, path, mime_type, name") - .eq("id", template["source_file_id"]) - .limit(1) - .execute() - ) - if not file_row or not file_row.get("bucket") or not file_row.get("path"): - raise HTTPException(status_code=404, detail="Template source not found") - bucket = file_row["bucket"] - path = file_row["path"] - else: - raise HTTPException(status_code=404, detail="Template source not found") - - if not bucket or not path: - raise HTTPException(status_code=404, detail="Template source not found") - - try: - pdf_bytes = StorageAdmin().download_file(bucket, path) - except Exception as exc: - logger.warning(f"Template source download failed for template {template_id}: {exc}") - raise HTTPException(status_code=404, detail="Template source not found") - + _, _, pdf_bytes = _resolve_template_source(ctx, template) return Response(content=pdf_bytes, media_type="application/pdf") +@router.post("/templates/{template_id}/auto-map") +async def auto_map_template( + template_id: str, + background_tasks: BackgroundTasks, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any] | JSONResponse: + template = _fetch_template_or_404(ctx, template_id) + _require_owner(ctx, template) + _require_source_visibility_or_404(ctx, template) + if _template_has_recorded_marks(ctx, template_id): + raise HTTPException(status_code=409, detail="Template has recorded marks; auto-map structural refresh is blocked.") + bucket, path, pdf_bytes = _resolve_template_source(ctx, template) + source_label = f"{bucket}/{path}" + try: + fast_path = _pdf_has_text_layer(pdf_bytes) + except Exception as exc: + logger.warning(f"auto-map text-layer detection failed for template {template_id}; falling back to OCR queue: {exc}") + fast_path = False + if not fast_path: + job_id = str(uuid.uuid4()) + _set_auto_map_status(job_id, {"status": "queued", "template_id": template_id}) + background_tasks.add_task(_run_auto_map_job, job_id, ctx, template_id, pdf_bytes, source_label) + return JSONResponse(status_code=202, content={"status": "accepted", "job_id": job_id}) + try: + _run_auto_map_merge(ctx, template_id, pdf_bytes, source_label) + except (AutoMapError, ValueError) as exc: + raise HTTPException(status_code=422, detail=f"Auto-map failed: {exc}") + except Exception as exc: + logger.exception(f"auto-map failed for template {template_id}: {exc}") + raise HTTPException(status_code=502, detail=f"Auto-map failed: {exc}") + background_tasks.add_task(project_template_safe, template_id) + return await get_template(template_id, ctx) + + +@router.get("/templates/{template_id}/auto-map/{job_id}/status") +async def auto_map_status( + template_id: str, + job_id: str, + ctx: ExamContext = Depends(get_exam_context), +) -> Dict[str, Any]: + template = _fetch_template_or_404(ctx, template_id) + _require_owner(ctx, template) + status = _get_auto_map_status(job_id) + if not status or status.get("template_id") != template_id: + raise HTTPException(status_code=404, detail="Auto-map job not found") + body = dict(status) + if body.get("status") == "completed": + body["template"] = await get_template(template_id, ctx) + return body + + @router.put("/templates/{template_id}") async def replace_template( template_id: str, diff --git a/tests/test_exam_templates.py b/tests/test_exam_templates.py index df6f1d3..598e7b6 100644 --- a/tests/test_exam_templates.py +++ b/tests/test_exam_templates.py @@ -481,3 +481,144 @@ def test_neo4j_sync_non_owner_403(): def test_neo4j_sync_404(): client, _ = make_client(store={"exam_templates": []}) assert client.post("/api/exam/templates/does-not-exist/neo4j-sync").status_code == 404 + + +# ─── S5 auto-map endpoint ──────────────────────────────────────────────────── + +def _first_pass_template(): + return { + "meta": {"schema": "exam-template/first-pass/v1", "paper_code": "8463/1", "n_pages": 1}, + "margins": [ + {"edge": "left", "axis": "x", "value": 50, "scope": "document", "source": "auto", "confirmed": False}, + {"edge": "right", "axis": "x", "value": 550, "scope": "document", "source": "auto", "confirmed": False}, + {"edge": "top", "axis": "y", "value": 780, "scope": "page", "page": 1, "source": "auto", "confirmed": False}, + {"edge": "bottom", "axis": "y", "value": 60, "scope": "page", "page": 1, "source": "auto", "confirmed": False}, + ], + "pages": { + "1": { + "role": "question", "role_source": "auto", "margins_enabled": True, + "main_bands": [{"question": "01", "y_start": 780, "y_end": 60, "source": "auto", "confirmed": False}], + "part_bands": [{"label": "01.1", "question": "01", "y_start": 700, "y_end": 500, "label_box": {"l": 50, "t": 700, "r": 90, "b": 680, "coord_origin": "BOTTOMLEFT"}, "source": "auto", "confirmed": False}], + "furniture": [], "figures": [], "tables": [], + } + }, + } + + +def _patch_auto_map(monkeypatch, store, *, fast=True): + monkeypatch.setattr(templates_mod, "StorageAdmin", _FakeStorageAdmin) + monkeypatch.setattr(templates_mod, "SupabaseServiceRoleClient", lambda: _FakeServiceRoleClient(store)) + monkeypatch.setattr(templates_mod, "_pdf_has_text_layer", lambda _pdf: fast) + monkeypatch.setattr(templates_mod, "auto_map", lambda *_a, **_k: _first_pass_template()) + monkeypatch.setattr(templates_mod, "detect_response_regions_from_pdf", lambda *_a, **_k: []) + monkeypatch.setattr(templates_mod, "_pdf_page_geometry", lambda _pdf: [{"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0, "page_pt_w": 600.0, "page_pt_h": 800.0, "rendered_w": 600.0, "rendered_h": 800.0, "page_top": 0.0}]) + templates_mod._AUTO_MAP_JOB_STATUS.clear() + + +def _template_with_source(owner=TEACHER): + return { + "exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": owner, "source_file_id": "f1"}], + "files": [{"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/paper.pdf", "name": "paper.pdf"}], + } + + +def test_box_to_canvas_uses_cropbox_as_page_origin(): + pages = [{ + "media_x0": 0.0, "crop_x0": 100.0, "crop_y0": 200.0, + "page_pt_w": 400.0, "page_pt_h": 600.0, + "rendered_w": 400.0, "rendered_h": 600.0, + "page_top": 25.0, + }] + box = {"l": 100.0, "t": 800.0, "r": 180.0, "b": 760.0, "coord_origin": "BOTTOMLEFT"} + assert templates_mod._box_to_canvas(box, 1, pages) == {"x": 0.0, "y": 25.0, "w": 80.0, "h": 40.0} + + +def test_response_region_types_are_mapped_to_response_form_enum(monkeypatch): + monkeypatch.setattr(templates_mod, "_pdf_page_geometry", lambda _pdf: [{"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0, "page_pt_w": 600.0, "page_pt_h": 800.0, "rendered_w": 600.0, "rendered_h": 800.0, "page_top": 0.0}]) + first_pass = _first_pass_template() + regions = [ + {"page_index": 0, "bbox": {"l": 50, "t": 700, "r": 100, "b": 680, "coord_origin": "BOTTOMLEFT"}, "region_type": "answer_lines", "confidence": 0.9}, + {"page_index": 0, "bbox": {"l": 50, "t": 650, "r": 100, "b": 620, "coord_origin": "BOTTOMLEFT"}, "region_type": "answer_box", "confidence": 0.9}, + {"page_index": 0, "bbox": {"l": 50, "t": 600, "r": 100, "b": 560, "coord_origin": "BOTTOMLEFT"}, "region_type": "working_space", "confidence": 0.9}, + ] + rows = templates_mod._map_first_pass_to_rows("t1", first_pass, b"%PDF", regions) + forms = [r.get("response_form") for r in rows["response_areas"] if r.get("derivation") == "opencv-response-region"] + assert forms == ["lines", "answer-box", "working"] + + +def test_auto_map_fast_path_merges_ai_rows_and_returns_detail(monkeypatch): + store = _template_with_source() + client, store = make_client(store=store) + _patch_auto_map(monkeypatch, store, fast=True) + resp = client.post("/api/exam/templates/t1/auto-map") + assert resp.status_code == 200 + body = resp.json() + assert body["exam_code"] == "8463/1" + assert body["layout"] and body["layout"][0]["source"] == "ai" + assert any(q["label"] == "01.1" and q["source"] == "ai" and q["confirmed"] is False for q in store["exam_questions"]) + assert store["exam_boundaries"] and store["exam_boundaries"][0]["derivation"] == "docling-main-band" + + +def test_auto_map_preserves_manual_and_confirmed_rows_on_rerun(monkeypatch): + store = _template_with_source() + store.update({ + "exam_questions": [ + {"id": "manual", "template_id": "t1", "label": "manual", "order": 0, "source": "manual", "confirmed": True}, + {"id": "accepted-ai", "template_id": "t1", "label": "accepted", "order": 1, "source": "ai", "confirmed": True}, + {"id": "old-ai", "template_id": "t1", "label": "old", "order": 2, "source": "ai", "confirmed": False}, + ], + "exam_response_areas": [], "exam_boundaries": [], "exam_template_layout": [], + }) + client, store = make_client(store=store) + _patch_auto_map(monkeypatch, store, fast=True) + assert client.post("/api/exam/templates/t1/auto-map").status_code == 200 + ids = {q["id"] for q in store["exam_questions"]} + assert {"manual", "accepted-ai"}.issubset(ids) + assert "old-ai" not in ids + + +def test_auto_map_non_owner_is_403_before_download(monkeypatch): + store = _template_with_source(owner=OTHER_TEACHER) + client, store = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store) + def _no_download(*_a, **_k): + raise AssertionError("download should not run before owner gate") + monkeypatch.setattr(templates_mod, "StorageAdmin", _no_download) + resp = client.post("/api/exam/templates/t1/auto-map") + assert resp.status_code == 403 + + +def test_auto_map_owner_lost_institute_membership_is_404_before_download(monkeypatch): + store = _template_with_source(owner=TEACHER) + client, store = make_client(user_id=TEACHER, institute_ids=(INST_B,), store=store) + def _no_download(*_a, **_k): + raise AssertionError("download should not run before visibility gate") + monkeypatch.setattr(templates_mod, "StorageAdmin", _no_download) + resp = client.post("/api/exam/templates/t1/auto-map") + assert resp.status_code == 404 + + +def test_auto_map_blocks_when_marks_recorded(monkeypatch): + store = _template_with_source() + store.update({ + "marking_batches": [{"id": "b1", "template_id": "t1"}], + "mark_entries": [{"id": "m1", "batch_id": "b1"}], + }) + client, store = make_client(store=store) + _patch_auto_map(monkeypatch, store, fast=True) + resp = client.post("/api/exam/templates/t1/auto-map") + assert resp.status_code == 409 + + +def test_auto_map_ocr_returns_job_id_and_status_completes(monkeypatch): + store = _template_with_source() + client, store = make_client(store=store) + _patch_auto_map(monkeypatch, store, fast=False) + resp = client.post("/api/exam/templates/t1/auto-map") + assert resp.status_code == 202 + job_id = resp.json()["job_id"] + status = client.get(f"/api/exam/templates/t1/auto-map/{job_id}/status") + assert status.status_code == 200 + body = status.json() + assert body["status"] == "completed" + assert body["counts"]["questions"] >= 2 + assert body["template"]["layout"]