[verified] add exam template auto-map endpoint

This commit is contained in:
kcar 2026-06-07 20:48:08 +01:00
parent 2ebbfc1cf4
commit 2678d0be42
2 changed files with 537 additions and 40 deletions

View File

@ -12,13 +12,19 @@ join keys (spec §2).
""" """
from __future__ import annotations from __future__ import annotations
import json
import os import os
import tempfile
import time
import uuid import uuid
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile
from fastapi.responses import Response from fastapi.responses import JSONResponse, Response
from api.services.docling import AutoMapError, auto_map
from api.services.docling import extract as docling_extract
from api.services.docling.regions import detect_response_regions_from_pdf
from modules.database.services.exam_projection import project_template, project_template_safe from modules.database.services.exam_projection import project_template, project_template_safe
from modules.database.supabase.utils.client import SupabaseServiceRoleClient from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin from modules.database.supabase.utils.storage import StorageAdmin
@ -37,6 +43,8 @@ router = APIRouter()
SOURCE_CABINET_NAME = "Exam Marker Template Sources" SOURCE_CABINET_NAME = "Exam Marker Template Sources"
SOURCE_BUCKET_FALLBACK = "cc.users" SOURCE_BUCKET_FALLBACK = "cc.users"
AUTO_MAP_JOB_PREFIX = "exam:auto-map"
_AUTO_MAP_JOB_STATUS: Dict[str, Dict[str, Any]] = {}
# ─── helpers ───────────────────────────────────────────────────────────────── # ─── helpers ─────────────────────────────────────────────────────────────────
@ -224,6 +232,341 @@ async def _upload_template_source_file(
return file_id return file_id
def _job_key(job_id: str) -> str:
return f"{AUTO_MAP_JOB_PREFIX}:{job_id}"
def _redis_client() -> Any:
try:
import redis
except Exception:
return None
try:
url = os.getenv("LOCAL_REDIS_URL") or os.getenv("REDIS_URL")
if url:
client = redis.Redis.from_url(url, decode_responses=True, socket_timeout=2)
else:
client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", "6379")),
db=int(os.getenv("REDIS_DB_DEV", os.getenv("REDIS_DB", "0"))),
password=os.getenv("REDIS_PASSWORD") or None,
decode_responses=True,
socket_timeout=2,
)
client.ping()
return client
except Exception:
return None
def _set_auto_map_status(job_id: str, payload: Dict[str, Any]) -> None:
status = {"job_id": job_id, "updated_at": int(time.time()), **payload}
_AUTO_MAP_JOB_STATUS[job_id] = status
client = _redis_client()
if client is not None:
try:
client.setex(_job_key(job_id), int(os.getenv("EXAM_AUTO_MAP_JOB_TTL", "3600")), json.dumps(status))
except Exception as exc:
logger.warning(f"auto-map redis status write failed for {job_id}: {exc}")
def _get_auto_map_status(job_id: str) -> Optional[Dict[str, Any]]:
client = _redis_client()
if client is not None:
try:
raw = client.get(_job_key(job_id))
if raw:
return json.loads(raw)
except Exception as exc:
logger.warning(f"auto-map redis status read failed for {job_id}: {exc}")
return _AUTO_MAP_JOB_STATUS.get(job_id)
def _resolve_template_source(ctx: ExamContext, template: Dict[str, Any]) -> Tuple[str, str, bytes]:
bucket: Optional[str] = None
path: Optional[str] = None
if template.get("exam_id"):
storage_loc = _lookup_exam_storage_loc(template["exam_id"])
if not storage_loc:
raise HTTPException(status_code=404, detail="Template source not found")
try:
bucket, path = _parse_storage_loc(storage_loc)
except ValueError:
raise HTTPException(status_code=404, detail="Template source not found")
elif template.get("source_file_id"):
# Same scoped service-role exception as source-pdf: owner gate has already passed.
file_row = _first(
SupabaseServiceRoleClient().supabase.table("files")
.select("bucket, path, mime_type, name")
.eq("id", template["source_file_id"])
.limit(1)
.execute()
)
if not file_row or not file_row.get("bucket") or not file_row.get("path"):
raise HTTPException(status_code=404, detail="Template source not found")
bucket = file_row["bucket"]
path = file_row["path"]
else:
raise HTTPException(status_code=404, detail="Template source not found")
try:
return bucket, path, StorageAdmin().download_file(bucket, path)
except Exception as exc:
logger.warning(f"Template source download failed for template {template.get('id')}: {exc}")
raise HTTPException(status_code=404, detail="Template source not found")
def _pdf_has_text_layer(pdf_bytes: bytes) -> bool:
with tempfile.NamedTemporaryFile(prefix="cc-auto-map-detect-", suffix=".pdf", delete=False) as fh:
fh.write(pdf_bytes)
tmp = fh.name
try:
return bool(docling_extract.has_text_layer(tmp))
finally:
try:
os.unlink(tmp)
except OSError:
pass
def _pdf_page_geometry(pdf_bytes: bytes) -> List[Dict[str, float]]:
with tempfile.NamedTemporaryFile(prefix="cc-auto-map-geom-", suffix=".pdf", delete=False) as fh:
fh.write(pdf_bytes)
tmp = fh.name
try:
import fitz
doc = fitz.open(tmp)
pages: List[Dict[str, float]] = []
page_top = 0.0
try:
for page in doc:
media = page.mediabox
crop = page.cropbox
rendered_w = float(crop.width or page.rect.width or 595.0)
rendered_h = float(crop.height or page.rect.height or 842.0)
pages.append({
"media_x0": float(media.x0),
"crop_x0": float(crop.x0),
"crop_y0": float(crop.y0),
"page_pt_w": float(crop.width or page.rect.width or 1),
"page_pt_h": float(crop.height or page.rect.height or 1),
"rendered_w": rendered_w,
"rendered_h": rendered_h,
"page_top": page_top,
})
page_top += rendered_h
finally:
doc.close()
return pages
except Exception as exc:
logger.warning(f"PDF geometry read failed; falling back to A4 page geometry: {exc}")
return []
finally:
try:
os.unlink(tmp)
except OSError:
pass
def _page_geom(pages: List[Dict[str, float]], page_number: int) -> Dict[str, float]:
if 1 <= page_number <= len(pages):
return pages[page_number - 1]
return {
"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0,
"page_pt_w": 595.0, "page_pt_h": 842.0,
"rendered_w": 595.0, "rendered_h": 842.0,
"page_top": (page_number - 1) * 842.0,
}
def _box_to_canvas(box: Optional[Dict[str, Any]], page_number: int, pages: List[Dict[str, float]]) -> Optional[Dict[str, float]]:
if not box:
return None
g = _page_geom(pages, page_number)
if box.get("coord_origin") == "TOPLEFT" and {"x", "y", "w", "h"}.issubset(box):
scale = 0.5 if box.get("unit") == "px" else 1.0
return {
"x": round(float(box["x"]) * scale, 2),
"y": round(g["page_top"] + float(box["y"]) * scale, 2),
"w": round(float(box["w"]) * scale, 2),
"h": round(float(box["h"]) * scale, 2),
}
if not {"l", "t", "r", "b"}.issubset(box):
return None
l, t, r, b = (float(box[k]) for k in ("l", "t", "r", "b"))
# Canvas pages are rendered from the PDF CropBox with page_left fixed at 0.
# Docling boxes are in PDF user-space coordinates, so subtract the CropBox
# origin instead of adding it; otherwise cropped PDFs shift right/down.
x = (l - g["crop_x0"]) / g["page_pt_w"] * g["rendered_w"]
y = g["page_top"] + (g["page_pt_h"] - (t - g["crop_y0"])) / g["page_pt_h"] * g["rendered_h"]
w = (r - l) / g["page_pt_w"] * g["rendered_w"]
h = (t - b) / g["page_pt_h"] * g["rendered_h"]
return {"x": round(x, 2), "y": round(y, 2), "w": round(w, 2), "h": round(h, 2)}
def _response_form_from_region_type(region_type: Any) -> Optional[str]:
return {
"answer_lines": "lines",
"answer_box": "answer-box",
"working_space": "working",
"lines": "lines",
"answer-box": "answer-box",
"working": "working",
}.get(str(region_type or ""))
def _y_to_canvas(y_value: float, page_number: int, pages: List[Dict[str, float]]) -> float:
g = _page_geom(pages, page_number)
return round(g["page_top"] + (g["page_pt_h"] - (float(y_value) - g["crop_y0"])) / g["page_pt_h"] * g["rendered_h"], 2)
def _ai_id(template_id: str, *parts: Any) -> str:
return str(uuid.uuid5(uuid.NAMESPACE_URL, "/".join(["cc-auto-map", template_id, *[str(p) for p in parts]])))
def _safe_confidence(value: Any = None) -> float:
if isinstance(value, (int, float)):
return max(0.0, min(1.0, float(value)))
return 0.75
def _margin_values(first_pass: Dict[str, Any], page_number: int) -> Dict[str, Optional[float]]:
vals: Dict[str, Optional[float]] = {"left": None, "right": None, "top": None, "bottom": None}
for m in first_pass.get("margins") or []:
edge = m.get("edge")
if edge not in vals:
continue
if m.get("scope") == "document" and edge in {"left", "right"}:
vals[edge] = m.get("value")
elif m.get("scope") == "page" and int(m.get("page") or -1) == page_number:
vals[edge] = m.get("value")
return vals
def _map_first_pass_to_rows(template_id: str, first_pass: Dict[str, Any], pdf_bytes: bytes, extra_regions: Optional[List[Dict[str, Any]]] = None) -> Dict[str, List[Dict[str, Any]]]:
pages_geom = _pdf_page_geometry(pdf_bytes)
questions: List[Dict[str, Any]] = []
response_areas: List[Dict[str, Any]] = []
boundaries: List[Dict[str, Any]] = []
layout: List[Dict[str, Any]] = []
q_ids: Dict[str, str] = {}
first_part_by_page: Dict[int, str] = {}
pages_obj = first_pass.get("pages") or {}
for page_key in sorted(pages_obj, key=lambda k: int(k)):
page_number = int(page_key)
page_index = page_number - 1
page = pages_obj[page_key]
margins = _margin_values(first_pass, page_number)
layout.append({
"id": _ai_id(template_id, "layout", page_number),
"template_id": template_id,
"page_index": page_index,
"role": page.get("role"),
"margin_left": margins["left"],
"margin_right": margins["right"],
"margin_top": margins["top"],
"margin_bottom": margins["bottom"],
"margins_enabled": bool(page.get("margins_enabled", True)),
"source": "ai",
"confirmed": False,
"confidence": 0.8,
"derivation": "docling-page-layout",
"meta": {"role_source": page.get("role_source"), "schema": first_pass.get("meta", {}).get("schema")},
})
for band in page.get("main_bands") or []:
label = str(band.get("question") or "").strip()
if not label:
continue
qid = q_ids.setdefault(label, _ai_id(template_id, "question", label))
if not any(q["id"] == qid for q in questions):
questions.append({"id": qid, "template_id": template_id, "label": label, "order": len(q_ids) - 1, "max_marks": 0, "is_container": True, "source": "ai", "confirmed": False, "confidence": _safe_confidence(band.get("confidence")), "derivation": "docling-main-band"})
for edge, yv in (("start", band.get("y_start")), ("end", band.get("y_end"))):
if yv is not None:
boundaries.append({"id": _ai_id(template_id, "boundary", label, edge, page_number), "template_id": template_id, "question_id": qid, "label": f"{label}:{edge}", "page_index": page_index, "y": _y_to_canvas(float(yv), page_number, pages_geom), "bounds": None, "source": "ai", "confirmed": False, "confidence": _safe_confidence(band.get("confidence")), "derivation": "docling-main-band"})
for band in page.get("part_bands") or []:
label = str(band.get("label") or "").strip()
parent_label = str(band.get("question") or "").strip()
if not label:
continue
parent_id = q_ids.setdefault(parent_label, _ai_id(template_id, "question", parent_label or label.split(".")[0]))
if parent_label and not any(q["id"] == parent_id for q in questions):
questions.append({"id": parent_id, "template_id": template_id, "label": parent_label, "order": len(q_ids) - 1, "max_marks": 0, "is_container": True, "source": "ai", "confirmed": False, "confidence": 0.7, "derivation": "docling-inferred-main-question"})
pid = _ai_id(template_id, "part", label)
first_part_by_page.setdefault(page_index, pid)
bounds = None
y1, y2 = band.get("y_start"), band.get("y_end")
if margins["left"] is not None and margins["right"] is not None and y1 is not None and y2 is not None:
top = max(float(y1), float(y2)); bottom = min(float(y1), float(y2))
bounds = _box_to_canvas({"l": margins["left"], "r": margins["right"], "t": top, "b": bottom, "coord_origin": "BOTTOMLEFT"}, page_number, pages_geom)
bounds = bounds or _box_to_canvas(band.get("label_box"), page_number, pages_geom)
questions.append({"id": pid, "template_id": template_id, "parent_id": parent_id, "label": label, "order": len(questions), "max_marks": 0, "is_container": False, "bounds": bounds, "page": page_number, "source": "ai", "confirmed": False, "confidence": _safe_confidence(band.get("confidence")), "derivation": "docling-part-band-x-margins"})
default_qid = questions[0]["id"] if questions else _ai_id(template_id, "question", "auto")
for page_key in sorted(pages_obj, key=lambda k: int(k)):
page_number = int(page_key); page_index = page_number - 1; page = pages_obj[page_key]
owner_qid = first_part_by_page.get(page_index, default_qid)
for collection, kind, context_type, derivation in (("furniture", "furniture", None, "docling-furniture"), ("figures", "context", "figure", "docling-context-figure"), ("tables", "context", "data_table", "docling-table")):
for idx, item in enumerate(page.get(collection) or []):
bounds = _box_to_canvas(item.get("box"), page_number, pages_geom)
if bounds:
row = {"id": _ai_id(template_id, collection, page_number, idx), "template_id": template_id, "question_id": owner_qid, "page": page_number, "bounds": bounds, "kind": kind, "source": "ai", "confirmed": False, "confidence": 0.65, "derivation": derivation}
if context_type:
row["context_type"] = context_type
response_areas.append(row)
for idx, region in enumerate(extra_regions or []):
page_index = int(region.get("page_index", 0))
bounds = _box_to_canvas(region.get("bbox") or {}, page_index + 1, pages_geom)
if bounds:
response_form = _response_form_from_region_type(region.get("region_type"))
if response_form:
response_areas.append({"id": _ai_id(template_id, "region", page_index, idx), "template_id": template_id, "question_id": first_part_by_page.get(page_index, default_qid), "page": page_index + 1, "bounds": bounds, "kind": "response", "response_form": response_form, "source": "ai", "confirmed": False, "confidence": _safe_confidence(region.get("confidence")), "derivation": region.get("detection_method") or "opencv-response-region"})
return {"questions": questions, "response_areas": response_areas, "boundaries": boundaries, "layout": layout}
def _refresh_ai_rows(ctx: ExamContext, template_id: str, rows: Dict[str, List[Dict[str, Any]]]) -> None:
sb = ctx.supabase
for table in ("exam_response_areas", "exam_boundaries", "exam_template_layout", "exam_questions"):
sb.table(table).delete().eq("template_id", template_id).eq("source", "ai").eq("confirmed", False).execute()
for table, key in (("exam_questions", "questions"), ("exam_response_areas", "response_areas"), ("exam_boundaries", "boundaries"), ("exam_template_layout", "layout")):
payload = rows.get(key) or []
if payload:
sb.table(table).insert(payload).execute()
def _run_auto_map_merge(ctx: ExamContext, template_id: str, pdf_bytes: bytes, source_label: str) -> Dict[str, List[Dict[str, Any]]]:
first_pass = auto_map(pdf_bytes, source_pdf=source_label)
extra_regions: List[Dict[str, Any]] = []
try:
with tempfile.NamedTemporaryFile(prefix="cc-auto-map-regions-", suffix=".pdf", delete=False) as fh:
fh.write(pdf_bytes)
tmp = fh.name
try:
extra_regions = detect_response_regions_from_pdf(tmp)
finally:
try:
os.unlink(tmp)
except OSError:
pass
except Exception as exc:
logger.info(f"auto-map response-region detection skipped for template {template_id}: {exc}")
rows = _map_first_pass_to_rows(template_id, first_pass, pdf_bytes, extra_regions)
_refresh_ai_rows(ctx, template_id, rows)
updates = {"exam_code": first_pass.get("meta", {}).get("paper_code"), "page_count": first_pass.get("meta", {}).get("n_pages")}
ctx.supabase.table("exam_templates").update({k: v for k, v in updates.items() if v is not None}).eq("id", template_id).execute()
return rows
def _run_auto_map_job(job_id: str, ctx: ExamContext, template_id: str, pdf_bytes: bytes, source_label: str) -> None:
_set_auto_map_status(job_id, {"status": "running", "template_id": template_id})
try:
rows = _run_auto_map_merge(ctx, template_id, pdf_bytes, source_label)
_set_auto_map_status(job_id, {"status": "completed", "template_id": template_id, "counts": {k: len(v) for k, v in rows.items()}})
except Exception as exc:
logger.exception(f"auto-map job failed for template {template_id}: {exc}")
_set_auto_map_status(job_id, {"status": "failed", "template_id": template_id, "error": str(exc)})
# ─── templates ─────────────────────────────────────────────────────────────── # ─── templates ───────────────────────────────────────────────────────────────
@ -339,48 +682,61 @@ async def get_template_source_pdf(
template = _fetch_template_or_404(ctx, template_id) template = _fetch_template_or_404(ctx, template_id)
_require_source_visibility_or_404(ctx, template) _require_source_visibility_or_404(ctx, template)
bucket: Optional[str] = None _, _, pdf_bytes = _resolve_template_source(ctx, template)
path: Optional[str] = None
if template.get("exam_id"):
storage_loc = _lookup_exam_storage_loc(template["exam_id"])
if not storage_loc:
raise HTTPException(status_code=404, detail="Template source not found")
try:
bucket, path = _parse_storage_loc(storage_loc)
except ValueError:
raise HTTPException(status_code=404, detail="Template source not found")
elif template.get("source_file_id"):
# Resolve the file row via service role (authz already done above: the caller proved they
# can see this template, and source_file_id is the template's own file). Reading `files`
# as-the-user trips a pre-existing broken RLS policy on cabinet_memberships
# (42P17 infinite recursion) — documented service-role exception, like the catalogue lookup.
file_row = _first(
SupabaseServiceRoleClient().supabase.table("files")
.select("bucket, path, mime_type, name")
.eq("id", template["source_file_id"])
.limit(1)
.execute()
)
if not file_row or not file_row.get("bucket") or not file_row.get("path"):
raise HTTPException(status_code=404, detail="Template source not found")
bucket = file_row["bucket"]
path = file_row["path"]
else:
raise HTTPException(status_code=404, detail="Template source not found")
if not bucket or not path:
raise HTTPException(status_code=404, detail="Template source not found")
try:
pdf_bytes = StorageAdmin().download_file(bucket, path)
except Exception as exc:
logger.warning(f"Template source download failed for template {template_id}: {exc}")
raise HTTPException(status_code=404, detail="Template source not found")
return Response(content=pdf_bytes, media_type="application/pdf") return Response(content=pdf_bytes, media_type="application/pdf")
@router.post("/templates/{template_id}/auto-map")
async def auto_map_template(
template_id: str,
background_tasks: BackgroundTasks,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any] | JSONResponse:
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
_require_source_visibility_or_404(ctx, template)
if _template_has_recorded_marks(ctx, template_id):
raise HTTPException(status_code=409, detail="Template has recorded marks; auto-map structural refresh is blocked.")
bucket, path, pdf_bytes = _resolve_template_source(ctx, template)
source_label = f"{bucket}/{path}"
try:
fast_path = _pdf_has_text_layer(pdf_bytes)
except Exception as exc:
logger.warning(f"auto-map text-layer detection failed for template {template_id}; falling back to OCR queue: {exc}")
fast_path = False
if not fast_path:
job_id = str(uuid.uuid4())
_set_auto_map_status(job_id, {"status": "queued", "template_id": template_id})
background_tasks.add_task(_run_auto_map_job, job_id, ctx, template_id, pdf_bytes, source_label)
return JSONResponse(status_code=202, content={"status": "accepted", "job_id": job_id})
try:
_run_auto_map_merge(ctx, template_id, pdf_bytes, source_label)
except (AutoMapError, ValueError) as exc:
raise HTTPException(status_code=422, detail=f"Auto-map failed: {exc}")
except Exception as exc:
logger.exception(f"auto-map failed for template {template_id}: {exc}")
raise HTTPException(status_code=502, detail=f"Auto-map failed: {exc}")
background_tasks.add_task(project_template_safe, template_id)
return await get_template(template_id, ctx)
@router.get("/templates/{template_id}/auto-map/{job_id}/status")
async def auto_map_status(
template_id: str,
job_id: str,
ctx: ExamContext = Depends(get_exam_context),
) -> Dict[str, Any]:
template = _fetch_template_or_404(ctx, template_id)
_require_owner(ctx, template)
status = _get_auto_map_status(job_id)
if not status or status.get("template_id") != template_id:
raise HTTPException(status_code=404, detail="Auto-map job not found")
body = dict(status)
if body.get("status") == "completed":
body["template"] = await get_template(template_id, ctx)
return body
@router.put("/templates/{template_id}") @router.put("/templates/{template_id}")
async def replace_template( async def replace_template(
template_id: str, template_id: str,

View File

@ -481,3 +481,144 @@ def test_neo4j_sync_non_owner_403():
def test_neo4j_sync_404(): def test_neo4j_sync_404():
client, _ = make_client(store={"exam_templates": []}) client, _ = make_client(store={"exam_templates": []})
assert client.post("/api/exam/templates/does-not-exist/neo4j-sync").status_code == 404 assert client.post("/api/exam/templates/does-not-exist/neo4j-sync").status_code == 404
# ─── S5 auto-map endpoint ────────────────────────────────────────────────────
def _first_pass_template():
return {
"meta": {"schema": "exam-template/first-pass/v1", "paper_code": "8463/1", "n_pages": 1},
"margins": [
{"edge": "left", "axis": "x", "value": 50, "scope": "document", "source": "auto", "confirmed": False},
{"edge": "right", "axis": "x", "value": 550, "scope": "document", "source": "auto", "confirmed": False},
{"edge": "top", "axis": "y", "value": 780, "scope": "page", "page": 1, "source": "auto", "confirmed": False},
{"edge": "bottom", "axis": "y", "value": 60, "scope": "page", "page": 1, "source": "auto", "confirmed": False},
],
"pages": {
"1": {
"role": "question", "role_source": "auto", "margins_enabled": True,
"main_bands": [{"question": "01", "y_start": 780, "y_end": 60, "source": "auto", "confirmed": False}],
"part_bands": [{"label": "01.1", "question": "01", "y_start": 700, "y_end": 500, "label_box": {"l": 50, "t": 700, "r": 90, "b": 680, "coord_origin": "BOTTOMLEFT"}, "source": "auto", "confirmed": False}],
"furniture": [], "figures": [], "tables": [],
}
},
}
def _patch_auto_map(monkeypatch, store, *, fast=True):
monkeypatch.setattr(templates_mod, "StorageAdmin", _FakeStorageAdmin)
monkeypatch.setattr(templates_mod, "SupabaseServiceRoleClient", lambda: _FakeServiceRoleClient(store))
monkeypatch.setattr(templates_mod, "_pdf_has_text_layer", lambda _pdf: fast)
monkeypatch.setattr(templates_mod, "auto_map", lambda *_a, **_k: _first_pass_template())
monkeypatch.setattr(templates_mod, "detect_response_regions_from_pdf", lambda *_a, **_k: [])
monkeypatch.setattr(templates_mod, "_pdf_page_geometry", lambda _pdf: [{"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0, "page_pt_w": 600.0, "page_pt_h": 800.0, "rendered_w": 600.0, "rendered_h": 800.0, "page_top": 0.0}])
templates_mod._AUTO_MAP_JOB_STATUS.clear()
def _template_with_source(owner=TEACHER):
return {
"exam_templates": [{"id": "t1", "title": "p", "status": "draft", "institute_id": INST_A, "teacher_id": owner, "source_file_id": "f1"}],
"files": [{"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/paper.pdf", "name": "paper.pdf"}],
}
def test_box_to_canvas_uses_cropbox_as_page_origin():
pages = [{
"media_x0": 0.0, "crop_x0": 100.0, "crop_y0": 200.0,
"page_pt_w": 400.0, "page_pt_h": 600.0,
"rendered_w": 400.0, "rendered_h": 600.0,
"page_top": 25.0,
}]
box = {"l": 100.0, "t": 800.0, "r": 180.0, "b": 760.0, "coord_origin": "BOTTOMLEFT"}
assert templates_mod._box_to_canvas(box, 1, pages) == {"x": 0.0, "y": 25.0, "w": 80.0, "h": 40.0}
def test_response_region_types_are_mapped_to_response_form_enum(monkeypatch):
monkeypatch.setattr(templates_mod, "_pdf_page_geometry", lambda _pdf: [{"media_x0": 0.0, "crop_x0": 0.0, "crop_y0": 0.0, "page_pt_w": 600.0, "page_pt_h": 800.0, "rendered_w": 600.0, "rendered_h": 800.0, "page_top": 0.0}])
first_pass = _first_pass_template()
regions = [
{"page_index": 0, "bbox": {"l": 50, "t": 700, "r": 100, "b": 680, "coord_origin": "BOTTOMLEFT"}, "region_type": "answer_lines", "confidence": 0.9},
{"page_index": 0, "bbox": {"l": 50, "t": 650, "r": 100, "b": 620, "coord_origin": "BOTTOMLEFT"}, "region_type": "answer_box", "confidence": 0.9},
{"page_index": 0, "bbox": {"l": 50, "t": 600, "r": 100, "b": 560, "coord_origin": "BOTTOMLEFT"}, "region_type": "working_space", "confidence": 0.9},
]
rows = templates_mod._map_first_pass_to_rows("t1", first_pass, b"%PDF", regions)
forms = [r.get("response_form") for r in rows["response_areas"] if r.get("derivation") == "opencv-response-region"]
assert forms == ["lines", "answer-box", "working"]
def test_auto_map_fast_path_merges_ai_rows_and_returns_detail(monkeypatch):
store = _template_with_source()
client, store = make_client(store=store)
_patch_auto_map(monkeypatch, store, fast=True)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 200
body = resp.json()
assert body["exam_code"] == "8463/1"
assert body["layout"] and body["layout"][0]["source"] == "ai"
assert any(q["label"] == "01.1" and q["source"] == "ai" and q["confirmed"] is False for q in store["exam_questions"])
assert store["exam_boundaries"] and store["exam_boundaries"][0]["derivation"] == "docling-main-band"
def test_auto_map_preserves_manual_and_confirmed_rows_on_rerun(monkeypatch):
store = _template_with_source()
store.update({
"exam_questions": [
{"id": "manual", "template_id": "t1", "label": "manual", "order": 0, "source": "manual", "confirmed": True},
{"id": "accepted-ai", "template_id": "t1", "label": "accepted", "order": 1, "source": "ai", "confirmed": True},
{"id": "old-ai", "template_id": "t1", "label": "old", "order": 2, "source": "ai", "confirmed": False},
],
"exam_response_areas": [], "exam_boundaries": [], "exam_template_layout": [],
})
client, store = make_client(store=store)
_patch_auto_map(monkeypatch, store, fast=True)
assert client.post("/api/exam/templates/t1/auto-map").status_code == 200
ids = {q["id"] for q in store["exam_questions"]}
assert {"manual", "accepted-ai"}.issubset(ids)
assert "old-ai" not in ids
def test_auto_map_non_owner_is_403_before_download(monkeypatch):
store = _template_with_source(owner=OTHER_TEACHER)
client, store = make_client(user_id=TEACHER, institute_ids=(INST_A,), store=store)
def _no_download(*_a, **_k):
raise AssertionError("download should not run before owner gate")
monkeypatch.setattr(templates_mod, "StorageAdmin", _no_download)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 403
def test_auto_map_owner_lost_institute_membership_is_404_before_download(monkeypatch):
store = _template_with_source(owner=TEACHER)
client, store = make_client(user_id=TEACHER, institute_ids=(INST_B,), store=store)
def _no_download(*_a, **_k):
raise AssertionError("download should not run before visibility gate")
monkeypatch.setattr(templates_mod, "StorageAdmin", _no_download)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 404
def test_auto_map_blocks_when_marks_recorded(monkeypatch):
store = _template_with_source()
store.update({
"marking_batches": [{"id": "b1", "template_id": "t1"}],
"mark_entries": [{"id": "m1", "batch_id": "b1"}],
})
client, store = make_client(store=store)
_patch_auto_map(monkeypatch, store, fast=True)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 409
def test_auto_map_ocr_returns_job_id_and_status_completes(monkeypatch):
store = _template_with_source()
client, store = make_client(store=store)
_patch_auto_map(monkeypatch, store, fast=False)
resp = client.post("/api/exam/templates/t1/auto-map")
assert resp.status_code == 202
job_id = resp.json()["job_id"]
status = client.get(f"/api/exam/templates/t1/auto-map/{job_id}/status")
assert status.status_code == 200
body = status.json()
assert body["status"] == "completed"
assert body["counts"]["questions"] >= 2
assert body["template"]["layout"]