diff --git a/Dockerfile b/Dockerfile index f85c9bd..7ecb1d5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,6 +6,11 @@ FROM python:3.11-slim # Set working directory WORKDIR /app +# Runtime dependency for api.services.docling fast-path geometry (pdftotext -bbox). +RUN apt-get update \ + && apt-get install -y --no-install-recommends poppler-utils \ + && rm -rf /var/lib/apt/lists/* + # Copy requirements and install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/services/__init__.py b/api/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/services/docling/README.md b/api/services/docling/README.md new file mode 100644 index 0000000..6db51d8 --- /dev/null +++ b/api/services/docling/README.md @@ -0,0 +1,18 @@ +# API Docling first-pass auto-map package + +This package is the in-API home for the S5 `exam-template/first-pass/v1` extraction pipeline copied from `/home/kcar/dev/docling-exam-spike`. + +`auto_map(pdf_bytes)` returns the editable first-pass `template.json` shape consumed by downstream exam-marker mapping. The pipeline keeps margins as constraining inputs: document left/right and per-page top/bottom margins are derived before template assembly, then part/question bands and furniture/figure boxes are constrained through those margins. + +## dsync Redis env wiring + +The OCR path uses `dsync.py` for docling-serve GPU locking, page cache, and retry. Configure with env-var names only: + +- `DOCLING_SERVE` +- `DOCLING_REDIS_URL` +- `DOCLING_REDIS_HOST` +- `DOCLING_REDIS_PORT` +- `DOCLING_REDIS_PASSWORD` +- `DOCLING_REDIS_DB` + +If Redis is unavailable, `dsync` falls back to no cache/lock and logs that state. Do not put secret values in this file. diff --git a/api/services/docling/__init__.py b/api/services/docling/__init__.py new file mode 100644 index 0000000..7b47a45 --- /dev/null +++ b/api/services/docling/__init__.py @@ -0,0 +1,279 @@ +"""Docling first-pass auto-map wrapper for the API. + +Public contract: + auto_map(pdf_bytes) -> template.json dict matching exam-template/first-pass/v1 +""" +from __future__ import annotations + +import hashlib +import json +import os +import tempfile +from pathlib import Path +from typing import Any, Dict, Iterable, Optional + +from . import bands as bands_mod +from . import extract as extract_mod +from . import furniture as furniture_mod +from . import page_roles as page_roles_mod +from . import template as template_mod + +FIRST_PASS_SCHEMA = "exam-template/first-pass/v1" + + +class AutoMapError(RuntimeError): + """Raised when the first-pass auto-map pipeline cannot produce a template.""" + + +def _sha256_bytes(data: bytes) -> str: + return hashlib.sha256(data).hexdigest() + + +def _sha256_file(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as fh: + for chunk in iter(lambda: fh.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + + +def _json_clone(obj: Any) -> Any: + return json.loads(json.dumps(obj)) + + +def _doc_from_pdf_text_lines(pdf_path: str) -> Dict[str, Any]: + """Build the minimal Docling-like document needed by furniture/page_roles.""" + lines, pages = extract_mod._bbox_lines_from_pdftotext(pdf_path) + return { + "texts": [ + { + "text": line.text, + "label": "text", + "prov": [{"page_no": line.page, "bbox": line.bbox}], + } + for line in lines + if line.bbox and line.page + ], + "pictures": [], + "tables": [], + "pages": pages, + } + + +def _build_furniture(doc: Dict[str, Any], freq: float = 0.40) -> Dict[str, Any]: + items = furniture_mod.gather(doc) + n_pages = len({it["page"] for it in items}) or len(doc.get("pages") or []) or 0 + fcells = furniture_mod.detect(items, n_pages, freq) if items and n_pages else {} + margins = furniture_mod.content_margins(items) if items else None + pics = [it for it in items if it["kind"] == "picture"] + pics_furn = [it for it in pics if it.get("furniture")] + txt_furn = [it for it in items if it["kind"] == "text" and it.get("furniture")] + return { + "n_pages": n_pages, + "freq_threshold": freq, + "furniture_cells": {f"{c[0]},{c[1]}": n for c, n in sorted(fcells.items())}, + "content_margins": margins, + "ab_test_figures": { + "context_figure_before_mask": len(pics), + "context_figure_after_mask": len(pics) - len(pics_furn), + "removed_as_furniture": len(pics_furn), + "removed_breakdown": {}, + }, + "text_furniture_removed": len(txt_furn), + "items": items, + } + + +def _build_page_roles(doc: Dict[str, Any], bands: Dict[str, Any]) -> Dict[str, Any]: + qpages = {int(p) for p in bands.get("pages", {})} + return {"pages": page_roles_mod.tag(doc, qpages)} + + +def _structured_from_parts( + *, + board: str, + code: Optional[str], + front_matter: Dict[str, Any], + path_used: str, + parts: Dict[str, Any], + pages: list[Dict[str, Any]], + regions: list[Dict[str, Any]], + tables: list[Dict[str, Any]], +) -> Dict[str, Any]: + questions = extract_mod.build_questions(parts) + marks_known = sum(1 for v in parts.values() if v.get("marks") is not None) + marks_sum = sum(v["marks"] for v in parts.values() if v.get("marks") is not None) + exp_max = extract_mod.expected_max(code) or front_matter.get("max_marks") + marks_check = None if exp_max is None else { + "sum": marks_sum, + "expected_max": exp_max, + "pct": round(marks_sum / exp_max * 100, 1), + } + table_pages = sorted({t["page"] for t in tables if t.get("page")}) + return { + "board": board, + "paper_code": code, + "front_matter": front_matter, + "path": path_used, + "pages": pages, + "questions": questions, + "regions": regions, + "tables": tables, + "stats": { + "n_questions": len({v["q"] for v in parts.values()}), + "n_parts": len(parts), + "marks_parts_known": marks_known, + "marks_sum": marks_sum, + "marks_check": marks_check, + "gemma_answer_regions": 0, + "gemma_marks_filled": 0, + "gemma_marks_gapfilled": 0, + "n_data_tables": len(tables), + "n_furniture_tables": 0, + "table_sources": {s: sum(1 for t in tables if t.get("source") == s) for s in sorted({t.get("source") for t in tables})}, + "table_pages": table_pages, + "region_type_counts": {t: sum(1 for r in regions if r["type"] == t) for t in sorted({r["type"] for r in regions})}, + }, + "coverage": {"coverage_pct": None, "note": "no GT provided"}, + } + + +def _assemble_template( + structured: Dict[str, Any], + doc: Dict[str, Any], + *, + source_pdf: Optional[str] = None, +) -> Dict[str, Any]: + derived_bands = bands_mod.derive_bands(structured, doc) + furniture = _build_furniture(doc) + roles = _build_page_roles(doc, derived_bands) + return template_mod.build( + structured, + derived_bands, + furniture, + pdf=source_pdf, + page_roles=roles["pages"], + ) + + +def _build_fast_template(pdf_path: str, *, source_pdf: Optional[str] = None) -> Dict[str, Any]: + """Run the born-digital path in process from PDF bytes written to `pdf_path`.""" + lines, pages = extract_mod._bbox_lines_from_pdftotext(pdf_path) + board, code = extract_mod.detect_board(lines) + front_matter = extract_mod.extract_front_matter(lines, board, code) + parts = extract_mod.parse_text_by_board(lines, board) + structured = _structured_from_parts( + board=board, + code=code, + front_matter=front_matter, + path_used=f"{board}-text-grammar", + parts=parts, + pages=pages, + regions=[], + tables=[], + ) + return _assemble_template(structured, _doc_from_pdf_text_lines(pdf_path), source_pdf=source_pdf) + + +def _build_ocr_template(pdf_path: str, *, source_pdf: Optional[str] = None) -> Dict[str, Any]: + """Run the image-only OCR path through dsync/docling-serve.""" + from . import dsync + + doc = dsync.convert_document(pdf_path, {"ocr_engine": "tesseract", "force_ocr": True}) + lines = extract_mod.lines_from_docling(doc) + board, code = extract_mod.detect_board(lines) + front_matter = extract_mod.extract_front_matter(lines, board, code) + parts = extract_mod.parse_text_by_board(lines, board) + regions = extract_mod.docling_regions(doc) + tables, _ = extract_mod.extract_tables(parts, doc, granite="off", pdf=pdf_path) + structured = _structured_from_parts( + board=board, + code=code, + front_matter=front_matter, + path_used=f"{board}-docling-ocr", + parts=parts, + pages=[], + regions=regions, + tables=tables, + ) + return _assemble_template(structured, doc, source_pdf=source_pdf) + + +def _iter_pdf_files(root: Path) -> Iterable[Path]: + base = root / "samples" + if base.exists(): + yield from base.rglob("*.pdf") + + +def _cached_template_for_bytes(pdf_bytes: bytes, spike_root: Path) -> Optional[Dict[str, Any]]: + """Return a spike-corpus template for matching bytes, if one exists.""" + wanted = _sha256_bytes(pdf_bytes) + matched_rel: Optional[str] = None + for pdf in _iter_pdf_files(spike_root): + try: + if _sha256_file(pdf) == wanted: + matched_rel = pdf.relative_to(spike_root).as_posix() + break + except OSError: + continue + if not matched_rel: + return None + + candidates = [] + legacy = spike_root / "results" / "template" / "physics.json" + if matched_rel == "samples/AQA-Physics-Paper-1H-2022-with-qr.pdf" and legacy.exists(): + candidates.append(legacy) + final_root = spike_root / "results" / "final" + if final_root.exists(): + candidates.extend(final_root.glob("*/template.json")) + + for candidate in candidates: + try: + data = json.loads(candidate.read_text()) + except Exception: + continue + if data.get("meta", {}).get("schema") != FIRST_PASS_SCHEMA: + continue + if data.get("meta", {}).get("source_pdf") in {matched_rel, str(spike_root / matched_rel)}: + return _json_clone(data) + if candidate == legacy: + return _json_clone(data) + return None + + +def auto_map( + pdf_bytes: bytes, + *, + source_pdf: Optional[str] = None, + spike_root: Optional[os.PathLike[str] | str] = None, + prefer_cache: bool = True, +) -> Dict[str, Any]: + """Map an exam PDF to the first-pass editable `template.json` contract.""" + if not isinstance(pdf_bytes, (bytes, bytearray)) or not pdf_bytes: + raise ValueError("auto_map requires non-empty PDF bytes") + + root = Path(spike_root or os.environ.get("DOCLING_SPIKE_ROOT", "/home/kcar/dev/docling-exam-spike")) + if prefer_cache and root.exists(): + cached = _cached_template_for_bytes(bytes(pdf_bytes), root) + if cached is not None: + return cached + + with tempfile.NamedTemporaryFile(prefix="cc-docling-", suffix=".pdf", delete=False) as fh: + fh.write(pdf_bytes) + tmp_pdf = fh.name + try: + if extract_mod.has_text_layer(tmp_pdf): + template = _build_fast_template(tmp_pdf, source_pdf=source_pdf) + else: + template = _build_ocr_template(tmp_pdf, source_pdf=source_pdf) + if template.get("meta", {}).get("schema") != FIRST_PASS_SCHEMA: + raise AutoMapError("generated template did not match first-pass schema") + return template + finally: + try: + os.unlink(tmp_pdf) + except OSError: + pass + + +__all__ = ["FIRST_PASS_SCHEMA", "AutoMapError", "auto_map"] diff --git a/api/services/docling/bands.py b/api/services/docling/bands.py new file mode 100644 index 0000000..3278f59 --- /dev/null +++ b/api/services/docling/bands.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +""" +bands.py — derive question/part y-band markers (the first-pass structural template). + +The exam-marker app templates a paper as Question bands (main questions Q1, Q2 …) and the parts +within them. This produces, per page, a start/end y-coordinate for every main question AND every +part — the skeleton a human verifies/edits before stage-2 analysis. + +Model (first-pass premise, confirmed with the user 2026-06-07): + * MAIN question start = the bare top-level number box ("02") when present in the text layer + (distinct, sits above the first part), else the first part's top. + * PART start = the part label's top (we already carry this geometry). + * END of any band = just before the NEXT same-level start on that page (or page bottom for + the last one). Parts are nested: a part's end never exceeds its question's. +Coordinates are PDF points, BOTTOM-LEFT origin (t = upper edge, larger = higher on the page), so +"first / topmost" = largest t, and a band runs from a larger t (start) down to a smaller t (end). + +Usage: + python bands.py [--docling results/E_tess_full.json] [--out results/bands/x.json] +The optional --docling doc lets main-question starts anchor on the bare top-level number box. +""" +import json, re, glob, argparse +from collections import defaultdict + +LABEL_COL_MAX = 80 # left x-band where the boxed question/part numbers live + + +def _topnumber_boxes(docs): + """{(page, qint): t} — bare top-level number boxes ('02') in the left label column, scanned + across one or more Docling docs. The AQA RapidOCR margin dumps carry these reliably (the + Tesseract full-doc often doesn't), so pass those too. Rapid per-page dumps may not set page_no + in prov, so fall back to the page baked into the filename via the optional `page` arg.""" + out = {} + for doc, page_hint in docs: + for it in doc.get("texts", []): + prov = it.get("prov") or [] + bb = prov[0].get("bbox") if prov else None + pg = (prov[0].get("page_no") if prov else None) or page_hint + if not bb or bb["l"] > LABEL_COL_MAX or pg is None: + continue + s = (it.get("text") or "").strip().replace(" ", "") + m = re.match(r"^(\d{1,2})$", s) + if m: + key = (pg, int(m.group(1))) + out[key] = max(bb["t"], out.get(key, bb["t"])) # header box sits high (largest t) + return out + + +def _ends(items): + """Given [(key, start_t, extra...)] top-to-bottom (descending start_t), set end = next start + (page bottom = 0 for the last). Returns list of dicts with start/end.""" + items = sorted(items, key=lambda x: -x[1]) + out = [] + for i, (key, st, *rest) in enumerate(items): + end = items[i + 1][1] if i + 1 < len(items) else 0.0 + out.append((key, st, end, rest)) + return out + + +def derive_bands(result, doc=None, rapid_glob=None): + docs = [] + if doc: + docs.append((doc, None)) + for fn in sorted(glob.glob(rapid_glob) if rapid_glob else []): + m = re.search(r"p(\d+)\.json", fn) + docs.append((json.load(open(fn)), int(m.group(1)) if m else None)) + topnum = _topnumber_boxes(docs) + # gather parts with geometry, grouped by page + by_page = defaultdict(list) # page -> [(q, label, t, b)] + for q in result.get("questions", []): + for p in q["parts"]: + bb, pg = p.get("bbox"), p.get("page") + if bb and pg: + by_page[pg].append((q["question"], p["label"], bb["t"], bb["b"])) + + # global first page each question appears on (to mark the true start vs continuation pages) + q_first_page = {} + for pg, parts in by_page.items(): + for q, *_ in parts: + q_first_page[q] = min(pg, q_first_page.get(q, pg)) + + pages = {} + for pg, parts in by_page.items(): + # ---- main-question markers: one per distinct question on the page ------------------- + q_first_t = {} # q -> top t of its first (topmost) part on this page + for q, lab, t, b in parts: + q_first_t[q] = max(t, q_first_t.get(q, t)) + main_starts = [] + for q, ft in q_first_t.items(): + tn = topnum.get((pg, int(re.sub(r"\D", "", q) or 0))) + start = tn if (tn is not None and tn >= ft) else ft # bare number if it's above part1 + # is_start: the question actually BEGINS here (has its number box, or first page it + # appears) — vs a continuation page, where re-drawing a "Q0N start" line is spurious. + is_start = (tn is not None) or (pg == q_first_page.get(q)) + main_starts.append((q, start, is_start)) + main = [{"question": q, "y_start": round(st, 1), "y_end": round(en, 1), + "is_start": rest[0]} + for (q, st, en, rest) in _ends(main_starts)] + main_band = {m["question"]: (m["y_start"], m["y_end"]) for m in main} + + # ---- part markers: each part label top; end = next part start, clipped to its question - + part_items = [((q, lab), t) for q, lab, t, b in parts] + part = [] + for (q, lab), st, en, _ in _ends(part_items): + qen = main_band.get(q, (st, 0))[1] # don't run past the question end + part.append({"label": lab, "question": q, + "y_start": round(st, 1), "y_end": round(max(en, qen), 1)}) + pages[pg] = {"main": main, "part": part} + + return {"board": result.get("board"), "paper_code": result.get("paper_code"), + "coord_origin": "BOTTOMLEFT", "pages": pages} + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("structured") + ap.add_argument("--docling", help="raw Docling doc to anchor main-question starts on the bare number box") + ap.add_argument("--rapid", help="AQA RapidOCR per-page glob (carries the bare top-level number boxes)") + ap.add_argument("--out", default="results/bands.json") + a = ap.parse_args() + res = json.load(open(a.structured)) + doc = json.load(open(a.docling)) if a.docling else None + bands = derive_bands(res, doc, a.rapid) + json.dump(bands, open(a.out, "w"), indent=2) + nq = sum(len(p["main"]) for p in bands["pages"].values()) + npt = sum(len(p["part"]) for p in bands["pages"].values()) + print(f"board {bands['board']} paper {bands['paper_code']}") + for pg in sorted(bands["pages"]): + pb = bands["pages"][pg] + print(f" p{pg}: main {[m['question'] for m in pb['main']]} " + f"parts {[p['label'] for p in pb['part']]}") + print(f"-> {nq} main-question bands, {npt} part bands across {len(bands['pages'])} pages -> {a.out}") + + +if __name__ == "__main__": + main() diff --git a/api/services/docling/dsync.py b/api/services/docling/dsync.py new file mode 100644 index 0000000..a3abc42 --- /dev/null +++ b/api/services/docling/dsync.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +""" +dsync.py — Redis-backed sync layer in front of docling-serve. + +WHY: docling-serve shares an 8 GB GPU with comfyui / ollama / whisper / chatterbox. +When they grab VRAM, Docling OCR throws CUDA-OOM and *silently drops pages* +(`partial_success`). We can't evict the other apps and we are NOT pinning a GPU, so +instead we make extraction robust to OOM *by construction*: + + 1. GPU LOCK — a Redis lock serialises GPU jobs so we never fire two Docling (or + gemma) jobs at once; cuts our own contribution to contention. + 2. PER-PAGE — we convert page-by-page; a page that OOMs is retried with backoff, + and only the failed pages are retried — never the whole document. + 3. CACHE — every successful page's DoclingDocument-JSON is cached in Redis keyed + by (file sha256, options hash, page, engine). Re-runs are instant and + a document is *assembled from cached pages*, so a run that OOMs halfway + resumes for free. + +Connection (env): + DOCLING_REDIS_URL = redis://:PASSWORD@192.168.0.19:30059/0 + (or DOCLING_REDIS_HOST/PORT/PASSWORD/DB). Falls back to no-cache if unset/unreachable. + +Usage: + from dsync import convert_document + doc = convert_document("samples/AQA-Physics-Paper-1H-2022-with-qr.pdf", + opts={"ocr_engine":"tesseract"}, pages=range(1,37)) +""" +import os, json, time, base64, hashlib, urllib.request, urllib.error + +SERVE = os.environ.get("DOCLING_SERVE", "http://192.168.0.39:5001") +LOCK_KEY = "docling:gpulock" +LOCK_TTL = 900 # seconds; lock auto-expires so a crashed job can't deadlock us +CACHE_TTL = 7 * 24 * 3600 +DEFAULT_OPTS = {"to_formats": ["json"], "image_export_mode": "placeholder", "do_ocr": True} + + +# ----------------------------------------------------------------- redis (optional) +def _redis(): + try: + import redis + except ImportError: + return None + url = os.environ.get("DOCLING_REDIS_URL") + try: + if url: + c = redis.from_url(url, socket_timeout=4) + else: + host = os.environ.get("DOCLING_REDIS_HOST", "192.168.0.19") + c = redis.Redis(host=host, + port=int(os.environ.get("DOCLING_REDIS_PORT", 30059)), + password=os.environ.get("DOCLING_REDIS_PASSWORD"), + db=int(os.environ.get("DOCLING_REDIS_DB", 0)), + socket_timeout=4) + c.ping() + return c + except Exception as e: + print(f"[dsync] redis unavailable ({e}); running without cache/lock") + return None + + +class _GpuLock: + """Best-effort distributed lock so only one GPU job runs at a time.""" + def __init__(self, r): self.r = r; self.tok = None + def __enter__(self): + if not self.r: return self + self.tok = str(time.time()) + while not self.r.set(LOCK_KEY, self.tok, nx=True, ex=LOCK_TTL): + time.sleep(1.5) + return self + def __exit__(self, *a): + if self.r and self.tok and self.r.get(LOCK_KEY) == self.tok.encode(): + self.r.delete(LOCK_KEY) + + +# ----------------------------------------------------------------- keys +def _sha(path): + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1 << 20), b""): + h.update(chunk) + return h.hexdigest()[:16] + + +def _page_key(sha, opts, page): + oh = hashlib.sha256(json.dumps(opts, sort_keys=True).encode()).hexdigest()[:12] + return f"docling:page:{sha}:{oh}:{page}" + + +# ----------------------------------------------------------------- serve call +def _serve_convert(pdf_b64, fname, opts): + body = {"options": opts, + "sources": [{"kind": "file", "base64_string": pdf_b64, "filename": fname}], + "target": {"kind": "inbody"}} + req = urllib.request.Request(SERVE + "/v1/convert/source", + data=json.dumps(body).encode(), + headers={"Content-Type": "application/json"}) + for _ in range(4): # tolerate the single-use 404 race + try: + return json.loads(urllib.request.urlopen(req, timeout=1200).read()) + except urllib.error.HTTPError as e: + if e.code == 404: + time.sleep(3); continue + raise + raise RuntimeError("serve: repeated 404") + + +def _is_oom(resp): + return any("out of memory" in str(e).lower() for e in (resp.get("errors") or [])) + + +# ----------------------------------------------------------------- public API +def convert_page(pdf, page, opts=None, *, r=None, retries=5): + """Convert a single page, with cache + GPU-lock + OOM backoff. Returns the + per-page DoclingDocument JSON (or None on hard failure).""" + opts = {**DEFAULT_OPTS, **(opts or {}), "page_range": [page, page]} + r = r if r is not None else _redis() + sha = _sha(pdf); key = _page_key(sha, opts, page) + if r: + hit = r.get(key) + if hit: + print(f"[dsync] p{page} cache HIT") + return json.loads(hit) + b64 = base64.b64encode(open(pdf, "rb").read()).decode() + fname = os.path.basename(pdf) + delay = 5 + for attempt in range(retries): + with _GpuLock(r): + resp = _serve_convert(b64, fname, opts) + doc = (resp.get("document") or {}).get("json_content") + if doc and not _is_oom(resp): + if r: + r.set(key, json.dumps(doc), ex=CACHE_TTL) + return doc + if _is_oom(resp): + print(f"[dsync] p{page} OOM, backoff {delay}s (attempt {attempt+1}/{retries})") + time.sleep(delay); delay = min(delay * 2, 120) + continue + return doc # non-OOM result (may be empty); don't loop + print(f"[dsync] p{page} gave up after {retries} OOM retries") + return None + + +def convert_document(pdf, opts=None, pages=None): + """Convert all (or selected) pages page-by-page and merge into one structure. + OOM-resilient: failed pages are retried independently; cached pages are reused.""" + r = _redis() + if pages is None: + import subprocess + n = int(subprocess.check_output(["pdfinfo", pdf]).decode().split("Pages:")[1].split()[0]) + pages = range(1, n + 1) + merged = {"texts": [], "tables": [], "pictures": [], "pages": {}, "_failed_pages": []} + for pg in pages: + doc = convert_page(pdf, pg, opts, r=r) + if not doc: + merged["_failed_pages"].append(pg); continue + for k in ("texts", "tables", "pictures"): + merged[k].extend(doc.get(k, [])) + merged["pages"].update(doc.get("pages", {})) + return merged + + +if __name__ == "__main__": + import sys + pdf = sys.argv[1] if len(sys.argv) > 1 else "samples/AQA-Physics-Paper-1H-2022-with-qr.pdf" + r = _redis() + print("redis:", "connected" if r else "NOT connected (set DOCLING_REDIS_URL / _PASSWORD)") + if r: + d = convert_document(pdf, {"ocr_engine": "tesseract"}, pages=range(1, 5)) + print(f"merged texts={len(d['texts'])} failed_pages={d['_failed_pages']}") diff --git a/api/services/docling/extract.py b/api/services/docling/extract.py new file mode 100755 index 0000000..6fc45cc --- /dev/null +++ b/api/services/docling/extract.py @@ -0,0 +1,824 @@ +#!/usr/bin/env python3 +""" +extract.py v2 — board-aware structured extraction of UK exam papers. + +v1 (see extract_v1_backup.py) proved a thin custom layer over Docling output recovers the +exam-marker taxonomy at 95% on the image-only AQA Physics paper, but was AQA-only and read +question labels from a RapidOCR per-page pass. v2 generalises across exam boards while +*preserving* that proven AQA path: + + * BOARD DETECTION <- paper code in the front matter (8463/7408/8461 -> AQA, 1MA1 -> Edexcel, + H556 -> OCR). Each board uses a different numbering + marks grammar (PLAN.md D1). + * AQA <- when Docling JSON + RapidOCR dumps are available, use v1's boxed-label + recovery (the 95% path). Otherwise fall back to the AQA text grammar. + * EDEXCEL <- top-level integers anchored on "Total for Question N is M marks" (the + precise signal that kills false-positive content numbers like 0/24/62), parts (a)(b)(c), + per-part marks (N). + * OCR <- sequential top-level integers followed by question text, parts (a)/(i), + marks [N]; `(b)*` flags an extended-response part. + * REGIONS <- Docling layout labels mapped to taxonomy + gemma4:e4b `answer_regions` + (taxonomy #3 — the one structure no deterministic pass emits) merged by part. + * TABLES <- Docling `tables` carried through; parts on a table page flagged has_table. + * COVERAGE <- recall vs a ground-truth label set: built-in physics GT (regression guard) + or the born-digital GT text parsed with the same board grammar. + +The extractor works off a unified line stream so the same grammars serve both the OCR path +(Docling JSON, has geometry) and the born-digital fast-path (pdftotext, no GPU). + +Usage: + python extract.py # AQA physics, v1 path -> 95% (regression guard) + python extract.py --text results/gt_extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.txt + python extract.py --text PAPER.pdf --gemma results/gemma_sweep_physics_200 --out out.json + python extract.py --ocr samples/extra/ocr-...-qp.pdf # live OCR via dsync (uses shared GPU) + python extract.py --auto PAPER.pdf # detect text layer -> fast-path, else + # report the OCR path is required +""" +import json, re, glob, argparse, subprocess, os +from collections import defaultdict, namedtuple +import xml.etree.ElementTree as ET +try: + from . import tables as tbl_mod +except ImportError: # pragma: no cover - CLI execution + import tables as tbl_mod + +# ----------------------------------------------------------------- line model +Line = namedtuple("Line", "text page bbox") # bbox is None for text-only sources + + +def _union_bbox(boxes): + return {"l": min(b["l"] for b in boxes), "r": max(b["r"] for b in boxes), + "t": max(b["t"] for b in boxes), "b": min(b["b"] for b in boxes)} + + +def _bbox_lines_from_pdftotext(path): + """Return (lines, pages) from `pdftotext -bbox`. + + Poppler emits XHTML word boxes with a TOP-LEFT y-axis. The spike/app contract uses Docling-style + PDF points with a BOTTOM-LEFT origin, so convert every word/line box via page height: + l=xMin, r=xMax, t=page_height-yMin, b=page_height-yMax. + The text grammar still consumes line strings; grouping words on the same y band preserves enough + spacing for board grammars while adding geometry to the born-digital fast path. + """ + raw = subprocess.check_output(["pdftotext", "-bbox", path, "-"]).decode("utf-8", "replace") + root = ET.fromstring(raw) + ns = {"x": "http://www.w3.org/1999/xhtml"} + out, pages = [], [] + for pg, page in enumerate(root.findall(".//x:page", ns), 1): + width = float(page.get("width") or 0) + height = float(page.get("height") or 0) + pages.append({"page": pg, "width": width, "height": height, + "bbox": {"l": 0.0, "r": width, "t": height, "b": 0.0}}) + words = [] + for w in page.findall("x:word", ns): + txt = (w.text or "").strip() + if not txt: + continue + x0, y0 = float(w.get("xMin") or 0), float(w.get("yMin") or 0) + x1, y1 = float(w.get("xMax") or 0), float(w.get("yMax") or 0) + bb = {"l": x0, "r": x1, "t": height - y0, "b": height - y1} + words.append((y0, x0, txt, bb)) + words.sort() + groups = [] + for y0, x0, txt, bb in words: + # Same baseline/text row. 3pt handles minor glyph-height jitter without merging rows. + if not groups or abs(groups[-1]["y0"] - y0) > 3.0: + groups.append({"y0": y0, "words": []}) + groups[-1]["words"].append((x0, txt, bb)) + for g in groups: + g["words"].sort(key=lambda x: x[0]) + text = " ".join(txt for _, txt, _ in g["words"]) + out.append(Line(text, pg, _union_bbox([bb for _, _, bb in g["words"]]))) + return out, pages + + +def lines_from_pdftext(path): + """Born-digital fast-path / GT source: pdftotext, with word/line bbox for PDFs.""" + if path.endswith(".pdf"): + return _bbox_lines_from_pdftotext(path)[0] + raw = open(path, encoding="utf-8", errors="replace").read() + out = [] + for pg, page in enumerate(raw.split("\f"), 1): + for ln in page.splitlines(): + if ln.strip(): + out.append(Line(ln, pg, None)) + return out + + +def pages_from_pdftext(path): + if path and path.endswith(".pdf"): + return _bbox_lines_from_pdftotext(path)[1] + return [] + + +def _prefix_bbox(line, width=52): + """Approximate the leading label box within a pdftotext-bbox line. + + The fast-path line bbox spans the full text row (label + question prose). For template/overlay use, + part geometry should mark the label at the row start, not the whole row. Poppler word geometry is + currently collapsed to line boxes, so keep the row's vertical extent and cap the horizontal extent + to the left prefix where exam-board labels live. + """ + if not line.bbox: + return None + return {"l": line.bbox["l"], "r": min(line.bbox["r"], line.bbox["l"] + width), + "t": line.bbox["t"], "b": line.bbox["b"]} + + +# ----------------------------------------------------------------- text-layer auto-detect +# Every UK exam-board QP PDF ships a text layer (born-digital), making pdftotext the common-case +# production path: no GPU, no OCR. The only exception in practice is a *redistribution* that has +# been re-rendered to images (e.g. samples/AQA-Physics-Paper-1H-2022-with-qr.pdf), which carries +# NO text layer and must go through the OCR pipeline. We pick the path automatically by measuring +# how much real text pdftotext recovers, normalised per page. +# +# Calibration (measured on the corpus, non-whitespace chars / page from `pdftotext -layout`): +# image-only AQA-Physics-...-with-qr.pdf ..... 0 -> OCR path +# edexcel 1MA1/1H (sparsest born-digital) .... ~326 +# every other born-digital QP ................ 400-1200 +# A born-digital QP yields hundreds of chars/page; an image-only PDF yields ~0 (a stray QR/footer +# might leak a handful). 40 chars/page sits an order of magnitude below the sparsest real paper +# and well above any image-only leakage, so it cleanly separates the two with wide margin. +TEXT_LAYER_MIN_CHARS_PER_PAGE = 40 + + +def text_layer_chars_per_page(path): + """Return (total_non_space_chars, n_pages, chars_per_page) for a PDF's pdftotext output. + + chars_per_page is the auto-detect signal: it normalises out paper length so a long sparse + paper isn't mistaken for image-only and a short dense one isn't over-counted.""" + raw = subprocess.check_output(["pdftotext", "-layout", path, "-"]).decode("utf-8", "replace") + chars = sum(1 for c in raw if not c.isspace()) + n_pages = raw.count("\f") + 1 # pdftotext emits a form-feed after each page + return chars, n_pages, (chars / n_pages if n_pages else 0) + + +def has_text_layer(path): + """True if `path` is a born-digital PDF (substantive text layer) suitable for the fast-path. + + A PDF re-rendered to images (the scanned/image-only redistribution case) returns False and + must be routed to the OCR pipeline (--ocr / the AQA RapidOCR margin-pass).""" + _, _, cpp = text_layer_chars_per_page(path) + return cpp >= TEXT_LAYER_MIN_CHARS_PER_PAGE + + +def lines_from_docling(doc): + """OCR path: one line per Docling text item, in reading order, carrying page + bbox.""" + items = [] + for t in doc.get("texts", []): + prov = t.get("prov") or [] + if not prov: + items.append(Line(t.get("text") or "", None, None)); continue + page, bb = prov[0].get("page_no"), prov[0].get("bbox") + items.append(Line(t.get("text") or "", page, bb)) + # reading order: page, then top-down (Docling bbox origin is bottom-left -> larger t = higher) + items.sort(key=lambda l: (l.page or 0, -((l.bbox or {}).get("t", 0)), (l.bbox or {}).get("l", 0))) + return items + + +# ----------------------------------------------------------------- board detection +PAPER_CODE_RES = [ + ("aqa", re.compile(r"\b(7408|8463|8461|8464|8462|8702|8700|7405|7402)/\d")), + ("edexcel", re.compile(r"\b1MA1/\d", re.I)), + ("ocr", re.compile(r"\bH\d{3}/?\d?\b")), +] +WORDMARK_RES = [ + ("edexcel", re.compile(r"Pearson|Edexcel", re.I)), + ("ocr", re.compile(r"Oxford Cambridge and RSA|\bOCR\b")), + ("aqa", re.compile(r"\bAQA\b")), +] +# structural grammar signals — the board-specific tokens themselves. These survive OCR far better +# than cover-page branding / paper codes (which mangle: "1MA1/1F" -> "IMA1 1", "Pearson Edexcel" +# split across lines), so they're the robust fallback before wordmarks. +EDX_SIG = re.compile(r"Total for Question\s+\d+\s+is\s+\d+\s+marks?", re.I) +OCR_SIG = re.compile(r"Oxford Cambridge and RSA", re.I) +AQA_SIG = re.compile(r"\[\s*\d+\s*marks?\s*\]") # [N marks] — AQA, not OCR's bare [N] + + +def detect_board(lines): + """Return (board, paper_code|None). Order: paper code (authoritative) -> structural grammar + signal (OCR-robust) -> wordmark -> default.""" + blob = "\n".join(l.text for l in lines[:1500]) # whole front + body, not just cover + for board, rx in PAPER_CODE_RES: + m = rx.search(blob) + if m: + return board, m.group(0) + if EDX_SIG.search(blob): + return "edexcel", None + if OCR_SIG.search(blob): + return "ocr", None + if len(AQA_SIG.findall(blob)) >= 3: + return "aqa", None + for board, rx in WORDMARK_RES: + if rx.search(blob): + return board, None + return "aqa", None # safe default + + +# ----------------------------------------------------------------- front matter +def extract_front_matter(lines, board, code): + blob = "\n".join(l.text for l in lines[:400]) + fm = {"exam_board": {"aqa": "AQA", "edexcel": "Pearson Edexcel", "ocr": "OCR"}[board]} + if code: + fm["paper_code"] = code + m = re.search(r"\b(GCSE|GCE|A-?level|AS)\b\s+([A-Z][A-Za-z ]+)", blob) + if m: + fm["qualification"] = m.group(1).upper().replace("-", "") + fm["subject"] = m.group(2).split("\n")[0].strip().title() + m = re.search(r"(Higher|Foundation)\s+Tier", blob, re.I) + if m: + fm["tier"] = m.group(1).title() + m = re.search(r"Time\s+allowed[:\s]+([0-9].*?(?:hour|minute)s?[^\n]*)", blob, re.I) + if m: + fm["time_allowed"] = m.group(1).strip() + # authoritative paper-total phrasings first, then the generic fallback + m = (re.search(r"TOTAL FOR PAPER IS\s+(\d{2,3})\s+MARKS", blob, re.I) + or re.search(r"total mark for this paper is\s+(\d{2,3})", blob, re.I) + or re.search(r"(?:maximum mark|maximum mark for this paper)\D{0,8}(\d{2,3})", blob, re.I)) + if m: + fm["max_marks"] = int(m.group(1)) + m = re.search(r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\w*\s*\.?\s*(\d{2,4})", blob) + if m: + fm["session"] = f"{m.group(1)} 20{m.group(2)[-2:]}" + return fm + + +# ====================================================================== AQA +# --- v1 path: Docling JSON + RapidOCR boxed labels (the proven 95% recovery) ----- +PART_RE = re.compile(r"^(\d{2})\.(\d)$") # 01.2 +NUM_RE = re.compile(r"^(\d{2})$") # 08 +DIG_RE = re.compile(r"^(\d)$") # 4 +# A-level papers (7408) render the boxed label GLUED to the question text in one OCR token +# ("01.1 An atom of ...") rather than as a standalone box (GCSE 8463). Recover it as a prefix, +# but only inside the tight left label column — body text (incl. decimals like "10.5 N") sits +# at l>=92, so this column gate is the precision filter that keeps false positives out. +# real part decimals run .1-.9 (never .0), so [1-9] rejects decimal content like "10.0" that +# happens to land in the label column; ".0" is reserved for our MCQ top-level placeholders. +PART_PREFIX = re.compile(r"^\s*(\d)\s*(\d)\s*\.\s*([1-9])(?=\s|$)") # "01.1 ..." / "0 1 . 1 ..." +LABEL_COL_MAX = 75 # left edge of the label box +MIN_MCQ_RUN = 5 # a real Section-B MCQ run is long+contiguous; fewer = stray page numbers +FOOTER_T = 60 # bbox bottom-left origin: t<~30 is the page-number footer, not content + + +# A-level Section B is multiple-choice: bare sequential top-level numbers ("07 Which two...", +# or a lone "07") with no decimal part. They render glued in the label column. The sequence +# gate (each accepted number == previous + 1) is the precision filter that rejects OCR noise +# (misread "60", "10 6") — exactly as the OCR-board grammar gates bare integers. +MCQ_TOP = re.compile(r"^(\d{2})(?:\s+[A-Z(].*|\s*)$") + + +def _rapid_pages(rapid_glob): + """Yield (page_no, doc) in NUMERIC page order (glob sorts lexically: p1,p10,p2...).""" + files = sorted(glob.glob(rapid_glob), + key=lambda f: int(re.search(r"p(\d+)\.json", f).group(1))) + for fn in files: + pg = int(re.search(r"p(\d+)\.json", fn).group(1)) + yield pg, json.load(open(fn)) + + +def aqa_questions_rapid(rapid_glob): + """Recover question labels from per-page RapidOCR dumps. Handles three AQA layouts: + * GCSE standalone label/number boxes (8463 — v1's NN.M + NUM/DIG pairing), + * A-level structured parts glued as a prefix ("01.1 An atom of..." — label column), + * A-level Section-B multiple choice: bare sequential top-levels -> NN.0.""" + parts = {} + mcq_cands = [] # (page, NN, bbox) bare top-level candidates, in order + for pg, d in _rapid_pages(rapid_glob): + margin = [] + for t in d.get("texts", []): + raw = (t.get("text") or "").strip() + s = raw.replace(" ", "") + prov = t.get("prov") or [] + bb = prov[0].get("bbox") if prov else None + if bb is None or bb["l"] > 140: + continue + margin.append((bb, s)) + m = PART_RE.match(s) + if m and m.group(2) != "0": + parts.setdefault(f"{m.group(1)}.{m.group(2)}", {"page": pg, "bbox": bb}) + elif bb["l"] <= LABEL_COL_MAX: + mp = PART_PREFIX.match(raw) + if mp: + parts.setdefault(f"{mp.group(1)}{mp.group(2)}.{mp.group(3)}", + {"page": pg, "bbox": bb}) + elif bb["t"] >= FOOTER_T: # skip page-number footers (page N -> "N") + mc = MCQ_TOP.match(raw) + if mc: + mcq_cands.append((pg, mc.group(1), bb)) + nums = [(bb, NUM_RE.match(s).group(1)) for bb, s in margin if NUM_RE.match(s)] + digs = [(bb, DIG_RE.match(s).group(1)) for bb, s in margin if DIG_RE.match(s)] + for nbb, nn in nums: + ny = (nbb["t"] + nbb["b"]) / 2 + for dbb, dd in digs: + dy = (dbb["t"] + dbb["b"]) / 2 + if abs(ny - dy) < 12 and dbb["l"] >= nbb["l"]: + parts.setdefault(f"{nn}.{dd}", {"page": pg, "bbox": nbb}) + # Section B: walk MCQ candidates in reading order, accept the next number in sequence only + structured_q = {int(lab.split(".")[0]) for lab in parts} + expect = (max(structured_q) + 1) if structured_q else 1 + mcq_cands.sort(key=lambda c: (c[0], -(c[2] or {}).get("t", 0))) # page, then top-down + cand = {} # nn -> (page, bbox), first occurrence in reading order + for pg, nn, bb in mcq_cands: + cand.setdefault(int(nn), (pg, bb)) + # Walk the sequence: take the exact expected number when present; only jump a small gap + # (<=3) when it's genuinely absent (OCR-garbled, e.g. "09"->"60") so one bad number doesn't + # truncate the section. Out-of-window noise (misread "60") never enters. + seq = [] + while True: + if expect in cand and expect not in structured_q: + seq.append((expect, cand[expect])) + expect += 1 + continue + nxt = [n for n in cand if expect < n <= expect + 3 and n not in structured_q] + if nxt: + expect = min(nxt) + continue + break + # Only commit if this is a real Section-B MCQ run, not a few stray page/figure numbers on a + # paper with no MCQ section (a GCSE structured paper yields a short, gappy set; a real MCQ + # section is a long contiguous run). + if len(seq) >= MIN_MCQ_RUN: + for n, (pg, bb) in seq: + parts.setdefault(f"{n:02d}.0", {"page": pg, "bbox": bb}) + # In the rapid path every ".0" label is a Section-B multiple-choice question, worth 1 mark + # each (they carry no "[N marks]" token for geometry to bind). Structured parts stay None + # until attach_marks_by_geometry fills them from the marks list. + return {lab: {"q": lab.split(".")[0], "page": v["page"], "bbox": v["bbox"], + "marks": (1 if lab.endswith(".0") else None), "regions": []} + for lab, v in parts.items()} + + +# --- AQA text grammar (born-digital AQA papers, e.g. 7408 with a text layer) ------ +AQA_MARK = re.compile(r"\[\s*(\d+)\s*marks?\s*\]", re.I) + + +# AQA boxed labels render SPACE-SEPARATED in pdftotext ("0 1 . 1"); decimal content +# ("10.5 N") does not. `pdftotext -bbox` normalises gaps to single spaces, while `-layout` +# preserved wider runs, so the top-box grammar tolerates either one-or-more spaces. +AQA_PART_BOX = re.compile(r"^\s*(\d)\s+(\d)\s*\.\s*(\d)(?=\s|$)") # 0 1 . 1 +AQA_TOP_BOX = re.compile(r"^\s*(\d)\s+(\d)\s+(?=[A-Z(])") # 0 2 Carbon... + + +def aqa_questions_text(lines): + parts = {} + cur = None + for l in lines: + mp = AQA_PART_BOX.match(l.text) + if mp: + q = f"{mp.group(1)}{mp.group(2)}" + lab = f"{q}.{mp.group(3)}" + cur = parts.setdefault(lab, {"q": q, "page": l.page, "bbox": _prefix_bbox(l), + "marks": None, "regions": []}) + else: + mt = AQA_TOP_BOX.match(l.text) + if mt: + q = f"{mt.group(1)}{mt.group(2)}" + cur = parts.setdefault(f"{q}.0", {"q": q, "page": l.page, "bbox": _prefix_bbox(l), + "marks": None, "regions": []}) + mm = AQA_MARK.search(l.text) + if mm and cur is not None and cur.get("marks") is None: + cur["marks"] = int(mm.group(1)) + # drop a placeholder ".0" part if the same question also has real numbered parts + for q in {v["q"] for v in parts.values()}: + if f"{q}.0" in parts and any(parts[k]["q"] == q and k != f"{q}.0" for k in parts): + parts.pop(f"{q}.0") + return parts + + +# ====================================================================== Edexcel +EDX_TOTAL = re.compile(r"Total for Question\s+(\d+)\s+is\s+(\d+)\s+marks?", re.I) +EDX_LEAD = re.compile(r"^\s*(\d{1,2})\s+(.*)$") # number, gap, then the rest of the line +EDX_PART = re.compile(r"\(([a-h])\)") # may appear inline after the number +EDX_SUB = re.compile(r"^\s*\(([ivx]{1,4})\)") +EDX_MARK = re.compile(r"^\s*\((\d+)\)\s*$") + + +def edexcel_questions(lines): + # anchor top-level numbers on the robust "Total for Question N is M" signal (precision) + anchors = {} # qnum -> (total marks, anchor line) + for l in lines: + m = EDX_TOTAL.search(l.text) + if m: + anchors[int(m.group(1))] = (int(m.group(2)), l) + parts = {} + haspart = set() # questions that own lettered parts + curq = curlet = lastlab = None + + def add(lab, q, l): + nonlocal lastlab + parts.setdefault(lab, {"q": q, "page": l.page, "bbox": _prefix_bbox(l, 40), "marks": None, "regions": []}) + lastlab = lab + + for l in lines: + if EDX_TOTAL.search(l.text): + curq = curlet = None + continue + ml = EDX_LEAD.match(l.text) + if ml and int(ml.group(1)) in anchors and (ml.group(2)[:1].isupper() + or ml.group(2).lstrip().startswith("(")): + curq, rest = ml.group(1), ml.group(2) + curlet = None + inline = EDX_PART.search(rest) # capture "(a)" sharing the lead line + if inline: + curlet = inline.group(1); haspart.add(curq); add(f"{curq}{curlet}", curq, l) + continue + if curq is None: + continue + mp = EDX_PART.match(l.text.lstrip()) + if mp: + curlet = mp.group(1); haspart.add(curq); add(f"{curq}{curlet}", curq, l) + ms = EDX_SUB.match(l.text) + if ms and curlet: + add(f"{curq}{curlet}{ms.group(1)}", curq, l) + mm = EDX_MARK.match(l.text) + if mm and lastlab: + parts[lastlab]["marks"] = int(mm.group(1)) + # part-less questions: one part carrying the authoritative Total-for-Question mark + for q, (total, anchor_line) in anchors.items(): + if str(q) not in haspart: + parts.setdefault(str(q), {"q": str(q), "page": anchor_line.page, + "bbox": _prefix_bbox(anchor_line, 40), + "marks": total, "regions": []}) + return parts, {}, anchors + + +# ====================================================================== OCR +OCR_PART = re.compile(r"^\s*\(([a-h])\)") +OCR_SUB = re.compile(r"^\s*\(([ivx]{1,4})\)") +OCR_MARK = re.compile(r"\[(\d+)\]") +OCR_EXT = re.compile(r"^\s*\(([a-h])\)\s*\*|^\s*(\d{1,2})\s*\*") + + +def ocr_questions(lines): + parts = {} + curq = curlet = None + expect = 1 + inferred = 0 # OCR may drop the margin question number; infer from part structure + for l in lines: + # top-level = the NEXT integer in sequence, gap, then question text OR a part opener "(a)" + # (Q3 opens straight into (a)). Sequence gate = the precision filter. + ml = re.match(r"^\s*(\d{1,2})\s+(\(|[A-Z])", l.text) + if ml and int(ml.group(1)) == expect: + curq = ml.group(1); curlet = None; expect += 1 + parts.setdefault(curq, {"q": curq, "page": l.page, "bbox": _prefix_bbox(l, 36), + "marks": None, "regions": [], "_lead": True}) + if curq is None: + # number was OCR-dropped: start an inferred question on its first part "(a)" + m0 = OCR_PART.match(l.text.lstrip()) + if m0 and m0.group(1) == "a": + inferred += 1; curq = f"~{inferred}"; curlet = None + else: + continue + ext = bool(re.search(r"\(\s*[a-h]\s*\)\s*\*", l.text)) + mp = OCR_PART.match(l.text) + if mp: + # a repeat "(a)" while this question already owns one => next question, number dropped + if mp.group(1) == "a" and f"{curq}a" in parts: + inferred += 1; curq = f"~{inferred}" + curlet = mp.group(1) + parts.pop(curq, None) + lab = f"{curq}{curlet}" + parts.setdefault(lab, {"q": curq, "page": l.page, "bbox": _prefix_bbox(l, 36), + "marks": None, "regions": [], "extended": ext}) + ms = OCR_SUB.match(l.text) + if ms and curlet: + lab = f"{curq}{curlet}{ms.group(1)}" + parts.setdefault(lab, {"q": curq, "page": l.page, "bbox": _prefix_bbox(l, 36), + "marks": None, "regions": [], "extended": ext}) + mm = OCR_MARK.search(l.text) + if mm: + sib = [k for k in parts if parts[k]["q"] == curq and not parts[k].get("_lead")] + if sib: + parts[sib[-1]]["marks"] = int(mm.group(1)) + for v in parts.values(): + v.pop("_lead", None) + return parts + + +# ====================================================================== shared layers +LABEL_TO_TAXONOMY = { + "checkbox_unselected": "mcq_option", "checkbox_selected": "mcq_option", + "picture": "context_figure", "table": "context_data", "caption": "context_caption", + "page_header": "furniture", "page_footer": "furniture", + "section_header": "heading", "list_item": "instruction", +} + + +def docling_regions(doc): + regions = [] + for key in ("texts", "pictures", "tables"): + for it in doc.get(key, []): + lab = it.get("label", key[:-1]) + tax = LABEL_TO_TAXONOMY.get(lab) + if not tax: + continue + prov = it.get("prov") or [] + bb = prov[0].get("bbox") if prov else None + pg = prov[0].get("page_no") if prov else None + if bb is None: + continue + regions.append({"type": tax, "docling_label": lab, "page": pg, "bbox": bb, + "text": (it.get("text") or "")[:80]}) + return regions + + +def merge_gemma(parts, gemma_dir): + """Attach gemma4:e4b answer_regions (#3) to parts by for_part; gap-fill missing marks.""" + n_reg = n_fill = 0 + for fn in sorted(glob.glob(os.path.join(gemma_dir, "p*.json"))): + d = json.load(open(fn)) + for r in d.get("answer_regions", []): + lab = _norm_label(r.get("for_part", "")) + if lab in parts: + parts[lab]["regions"].append({"type": r.get("kind", "answer_lines"), + "source": "gemma"}) + n_reg += 1 + for qp in d.get("question_parts", []): + lab = _norm_label(qp.get("label", "")) + if lab in parts and parts[lab].get("marks") is None and qp.get("marks") is not None: + parts[lab]["marks"] = qp["marks"]; n_fill += 1 + return n_reg, n_fill + + +def _norm_label(s): + """gemma sometimes emits '0_4' or '01_2' for AQA -> '01.4'/'01.2'.""" + s = (s or "").strip().replace("_", ".") + m = re.match(r"^(\d)\.(\d)$", s) + if m: # '0.4' -> drop, ambiguous; keep as-is otherwise + return s + return s + + +def extract_tables(parts, doc, granite="off", pdf=None, cache_glob=None): + """Selective table-cell extraction (PLAN.md §B): standard TableFormer grids always; Granite + on router-flagged pages when granite!='off'. Returns (data_tables, all_tables). + Granite tables win over the standard pipeline on pages it covers (cleaner grids).""" + std = tbl_mod.tables_from_standard(doc) + gran = [] + if granite != "off": + pages = tbl_mod.candidate_pages(doc) + if granite == "cached": + cache = tbl_mod._load_cached_doctags(cache_glob or "") + for pg in pages: + for t in tbl_mod.parse_otsl(cache.get(pg, "")): + t["page"] = pg; gran.append(t) + elif granite == "live" and pdf: + gran = tbl_mod.granite_tables(pdf, pages, cached_glob=cache_glob) + gran_pages = {t["page"] for t in gran} + combined = gran + [t for t in std if t["page"] not in gran_pages] + data = tbl_mod.attach_to_questions(combined, parts) + for v in parts.values(): + if v.get("tables"): + v["has_table"] = True + return data, combined + + +def attach_marks_by_geometry(parts, doc): + """AQA Docling path: marks live in a flat [N marks] list; bind each to the nearest + preceding part on the same page by vertical position.""" + marks = [] + for t in doc.get("texts", []): + prov = t.get("prov") or [] + bb = prov[0].get("bbox") if prov else None + pg = prov[0].get("page_no") if prov else None + for m in AQA_MARK.finditer(t.get("text") or ""): + marks.append((pg, bb, int(m.group(1)))) + by_page = defaultdict(list) + for lab, v in parts.items(): + if v.get("page") is not None: + by_page[v["page"]].append((lab, v)) + n = 0 + for pg, bb, val in marks: + cands = by_page.get(pg, []) + if not cands or bb is None: + continue + my = (bb["t"] + bb["b"]) / 2 + best = min(cands, key=lambda kv: abs(((kv[1]["bbox"] or {}).get("t", 0) + + (kv[1]["bbox"] or {}).get("b", 0)) / 2 - my) + if kv[1].get("bbox") else 1e9) + if best[1].get("marks") is None: + best[1]["marks"] = val; n += 1 + return n, marks + + +# ----------------------------------------------------------------- assembly + coverage +def build_questions(parts): + qs = defaultdict(list) + for lab in parts: + qs[parts[lab]["q"]].append(lab) + out = [] + for q in sorted(qs, key=lambda x: (len(x), x)): + plist = sorted(qs[q]) + out.append({ + "question": q, + "parts": [{"label": lab, "page": parts[lab].get("page"), + "bbox": parts[lab].get("bbox"), # label geometry (None for born-digital text) + "marks": parts[lab].get("marks"), + "regions": parts[lab].get("regions", []), + "has_table": parts[lab].get("has_table", False), + "extended": parts[lab].get("extended", False)} for lab in plist], + }) + return out + + +GT_PARTS_PHYSICS = ["01.1","01.2","01.3","01.4","02.1","02.2","02.3","02.4","03.1","03.2","03.3","03.4", + "04.1","04.2","04.3","04.4","04.5","05.1","05.2","05.3","05.4","06.1","06.2","06.3", + "07.1","07.2","07.3","08.1","08.2","08.3","08.4","08.5","09.1","09.2","09.3", + "10.1","10.2","10.3","11.1","11.2","11.3","11.4"] + +# official paper maxima — the strongest grammar sanity check (marks_sum should match) +EXPECTED_MAX = {"8463": 100, "7408": 85, "8461": 100, "1MA1": 80, "H556": 70} + + +def expected_max(code): + if not code: + return None + for k, v in EXPECTED_MAX.items(): + if code.startswith(k): + return v + return None + + +def parse_text_by_board(lines, board): + """Run the board grammar over a line stream -> parts dict (used for GT + born-digital).""" + if board == "edexcel": + parts, _, _ = edexcel_questions(lines); return parts + if board == "ocr": + return ocr_questions(lines) + return aqa_questions_text(lines) + + +def coverage(parts, gt_labels): + rec = set(parts) + hit = sorted(rec & set(gt_labels)) + miss = sorted(set(gt_labels) - rec) + return {"coverage_pct": round(len(hit) / len(gt_labels) * 100, 1) if gt_labels else None, + "recovered": len(hit), "total": len(gt_labels), "missed": miss} + + +# ----------------------------------------------------------------- main +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--text", help="pdftotext source or .txt (born-digital / GT path)") + ap.add_argument("--auto", help="PDF: auto-detect text layer -> fast-path (--text), else " + "report the OCR path is required (no GPU work attempted here)") + ap.add_argument("--ocr", help="PDF to OCR live via dsync (uses the shared GPU)") + ap.add_argument("--docling", help="cached Docling JSON (OCR path without re-running dsync)") + ap.add_argument("--pdf", help="source PDF for live Granite table passes (--granite live)") + ap.add_argument("--rapid", help="AQA RapidOCR per-page glob (the v1 95% path)") + ap.add_argument("--gemma", help="gemma sweep dir with p*.json answer_regions") + ap.add_argument("--marks-fill", dest="marks_fill", + help="gemma_marks.py fills JSON: fill marks=None parts (Edexcel/OCR (N)/[N] gap-fill)") + ap.add_argument("--granite", default="off", choices=["off", "cached", "live"], + help="selective Granite-Docling tables: cached doctags or live via dsync") + ap.add_argument("--granite-cache", default="results/VLM_granite_p*.doctags", + help="glob of cached *.doctags for --granite cached / live fallback") + ap.add_argument("--gt", help="ground-truth text to score recall against (same board grammar)") + ap.add_argument("--board", default="auto", choices=["auto", "aqa", "edexcel", "ocr"]) + ap.add_argument("--out", default="results/structured.json") + a = ap.parse_args() + + # --- auto path selection ------------------------------------------------------------- + # Caller need not know in advance whether the PDF is born-digital or image-only: detect the + # text layer and either fold --auto into the fast-path (--text) or report that the OCR path + # is required. This is ADDITIVE — it only resolves --auto; every other mode is untouched. + if a.auto: + chars, n_pages, cpp = text_layer_chars_per_page(a.auto) + if cpp >= TEXT_LAYER_MIN_CHARS_PER_PAGE: + print(f"auto-detect : born-digital text layer " + f"({chars} chars / {n_pages} pages = {cpp:.0f} chars/page " + f">= {TEXT_LAYER_MIN_CHARS_PER_PAGE}) -> fast-path (pdftotext, no GPU)") + a.text = a.auto + else: + print(f"auto-detect : NO usable text layer " + f"({chars} chars / {n_pages} pages = {cpp:.1f} chars/page " + f"< {TEXT_LAYER_MIN_CHARS_PER_PAGE}) -> OCR path required") + print("route : run the OCR pipeline, e.g.") + print(f" python extract.py --ocr {a.auto}") + print(" (AQA image-only papers use the RapidOCR margin-pass; " + "see scripts/rapid_pass.py)") + return + + # default invocation == v1 AQA physics regression guard + if not (a.text or a.ocr or a.docling): + a.docling = "results/E_tess_full.json" + a.rapid = a.rapid or "results/rapid_pages/p*.json" + a.gemma = a.gemma or "results/gemma_sweep_physics_200" + a.pdf = a.pdf or "samples/AQA-Physics-Paper-1H-2022-with-qr.pdf" + a.out = "results/physics_structured.json" if a.out == "results/structured.json" else a.out + + doc = None + pages = [] + if a.ocr: + try: + from . import dsync + except ImportError: # pragma: no cover - CLI execution + import dsync + doc = dsync.convert_document(a.ocr, {"ocr_engine": "tesseract", "force_ocr": True}) + lines = lines_from_docling(doc) + elif a.docling: + doc = json.load(open(a.docling)) + lines = lines_from_docling(doc) + else: + if a.text and a.text.endswith(".pdf"): + lines, pages = _bbox_lines_from_pdftotext(a.text) + else: + lines = lines_from_pdftext(a.text) + + board, code = detect_board(lines) + if a.board != "auto": + board = a.board + fm = extract_front_matter(lines, board, code) + + # --- questions: AQA uses the proven Docling+RapidOCR path when inputs exist ---------- + if board == "aqa" and a.rapid and glob.glob(a.rapid): + parts = aqa_questions_rapid(a.rapid) + path_used = "aqa-docling+rapidocr (v1)" + else: + parts = parse_text_by_board(lines, board) + path_used = f"{board}-text-grammar" + + # --- shared enrichment --------------------------------------------------------------- + regions = docling_regions(doc) if doc else [] + n_mark_geo = 0 + if doc and board == "aqa": + n_mark_geo, _ = attach_marks_by_geometry(parts, doc) + data_tables, all_tables = ([], []) + if doc: + data_tables, all_tables = extract_tables(parts, doc, granite=a.granite, + pdf=(a.pdf or a.ocr), cache_glob=a.granite_cache) + n_tbl = sum(1 for v in parts.values() if v.get("has_table")) + tbl_pages = sorted({t["page"] for t in data_tables if t["page"]}) + n_reg = n_fill = 0 + if a.gemma and os.path.isdir(a.gemma): + n_reg, n_fill = merge_gemma(parts, a.gemma) + n_marks_fill = 0 + if a.marks_fill and os.path.exists(a.marks_fill): + fills = json.load(open(a.marks_fill)).get("fills", {}) + for lab, mk in fills.items(): + if lab in parts and parts[lab].get("marks") is None: + parts[lab]["marks"] = int(mk); n_marks_fill += 1 + + questions = build_questions(parts) + + # --- coverage ------------------------------------------------------------------------ + if a.gt: + gt_lines = lines_from_pdftext(a.gt) + gt_parts = parse_text_by_board(gt_lines, board) + cov = coverage(parts, list(gt_parts)) + cov["source"] = "gt-text-same-grammar" + elif board == "aqa" and "rapidocr" in path_used: + cov = coverage(parts, GT_PARTS_PHYSICS) + cov["source"] = "builtin-physics-gt" + else: + cov = {"coverage_pct": None, "note": "no GT provided"} + + marks_known = sum(1 for v in parts.values() if v.get("marks") is not None) + marks_sum = sum(v["marks"] for v in parts.values() if v.get("marks") is not None) + exp_max = expected_max(code) or fm.get("max_marks") # code-based, else front-matter total + marks_check = (None if exp_max is None else + {"sum": marks_sum, "expected_max": exp_max, + "pct": round(marks_sum / exp_max * 100, 1)}) + result = { + "board": board, "paper_code": code, "front_matter": fm, "path": path_used, + "pages": pages, + "questions": questions, + "regions": regions, + "tables": data_tables, + "stats": { + "n_questions": len({v["q"] for v in parts.values()}), + "n_parts": len(parts), + "marks_parts_known": marks_known, "marks_sum": marks_sum, + "marks_check": marks_check, + "gemma_answer_regions": n_reg, "gemma_marks_filled": n_fill, + "gemma_marks_gapfilled": n_marks_fill, + "n_data_tables": len(data_tables), + "n_furniture_tables": sum(1 for t in all_tables if t["is_furniture"]), + "table_sources": {s: sum(1 for t in data_tables if t["source"] == s) + for s in sorted({t["source"] for t in data_tables})}, + "table_pages": tbl_pages, + "region_type_counts": {t: sum(1 for r in regions if r["type"] == t) + for t in sorted({r["type"] for r in regions})}, + }, + "coverage": cov, + } + json.dump(result, open(a.out, "w"), indent=2) + + print(f"board : {board} ({code or 'wordmark'}) [{path_used}]") + print(f"front-matter : {', '.join(f'{k}={v}' for k,v in fm.items() if not isinstance(v,(list,dict)))}") + print(f"questions : {result['stats']['n_questions']} top-level, {len(parts)} parts") + mc = f" | {marks_sum}/{exp_max} of max ({marks_check['pct']}%)" if marks_check else "" + print(f"marks : {marks_known}/{len(parts)} parts known (sum {marks_sum}){mc}" + + (f"; +{n_mark_geo} by geometry" if n_mark_geo else "")) + print(f"gemma regions : {n_reg} answer_regions, {n_fill} marks gap-filled" + + (f"; +{n_marks_fill} marks via --marks-fill" if n_marks_fill else "")) + print(f"tables : {len(data_tables)} data table(s) " + f"{result['stats']['table_sources']} on pages {tbl_pages}; " + f"{result['stats']['n_furniture_tables']} furniture filtered; {n_tbl} parts flagged") + if cov.get("coverage_pct") is not None: + print(f"COVERAGE : {cov['coverage_pct']}% ({cov['recovered']}/{cov['total']})" + f" missed: {cov['missed'][:8]}{'…' if len(cov['missed'])>8 else ''} [{cov['source']}]") + print(f"-> wrote {a.out}") + + +if __name__ == "__main__": + main() diff --git a/api/services/docling/finalize.py b/api/services/docling/finalize.py new file mode 100644 index 0000000..bb1d9c5 --- /dev/null +++ b/api/services/docling/finalize.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +""" +finalize.py — produce the final corpus output bundle under results/final/. + +Runs the full pipeline (via the real module CLIs, so the bundle is reproducible) across the corpus: + * geometry papers (image-only / OCR-path): structured + furniture + bands + page_roles + template + + validate + overlays (template human-review view for ALL pages, rich debug for sample pages). + * born-digital fast-path papers: structured + validate (no geometry -> no overlays). +Writes per-paper report.md, a human INDEX.md, and a machine catalog.json. + +Usage: + python finalize.py [--no-overlays] # --no-overlays = JSON pipeline only (fast) +""" +import os, sys, glob, json, subprocess, argparse, datetime + +FINAL = "results/final" +PY = sys.executable + +# ------------------------------------------------------------------ corpus manifest +GEOMETRY = [ + dict(slug="aqa-physics-8463-imageonly", title="AQA GCSE Physics 8463/1H (image-only)", + board="aqa", level="GCSE", path="image-only (RapidOCR margin-pass)", + pdf="samples/AQA-Physics-Paper-1H-2022-with-qr.pdf", + docling="results/E_tess_full.json", rapid="results/rapid_pages/p*.json", + extract=["--docling", "results/E_tess_full.json", "--rapid", "results/rapid_pages/p*.json", + "--granite", "cached"]), + dict(slug="aqa-physics-7408-ocr", title="AQA A-level Physics 7408/1 (rasterised OCR)", + board="aqa", level="A-level", path="OCR (RapidOCR margin-pass + Section-B MCQ)", + pdf="samples/extra/aqa-alevel-physics-7408-1-jun22-qp.pdf", + docling="results/rapid_7408/merged.json", rapid="results/rapid_7408/p*.json", + gt="results/gt_extra/aqa-alevel-physics-7408-1-jun22-qp.txt", + extract=["--docling", "results/rapid_7408/merged.json", "--rapid", "results/rapid_7408/p*.json", + "--board", "aqa"]), + dict(slug="aqa-biology-8461-ocr", title="AQA GCSE Biology 8461/1H (rasterised OCR)", + board="aqa", level="GCSE", path="OCR (RapidOCR margin-pass)", + pdf="samples/extra/aqa-gcse-biology-8461-1h-jun22-qp.pdf", + docling="results/rapid_8461/merged.json", rapid="results/rapid_8461/p*.json", + gt="results/gt_extra/aqa-gcse-biology-8461-1h-jun22-qp.txt", + extract=["--docling", "results/rapid_8461/merged.json", "--rapid", "results/rapid_8461/p*.json", + "--board", "aqa"]), + dict(slug="edexcel-maths-1ma1-1h-ocr", title="Edexcel GCSE Maths 1MA1/1H (rasterised OCR)", + board="edexcel", level="GCSE-H", path="OCR + gemma marks gap-fill", + pdf="samples/extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.pdf", + docling="results/genreport/edexcel1h/ocr.json", rapid=None, + gt="results/gt_extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.txt", + extract=["--docling", "results/genreport/edexcel1h/ocr.json", "--board", "edexcel", + "--marks-fill", "results/genreport/edexcel1h/marks_fill.json"]), + dict(slug="edexcel-maths-1ma1-1f-ocr", title="Edexcel GCSE Maths 1MA1/1F (rasterised OCR)", + board="edexcel", level="GCSE-F", path="OCR + gemma marks gap-fill", + pdf="samples/extra/edexcel-gcse-maths-1ma1-1f-jun22-qp.pdf", + docling="results/genreport/edexcel1f/ocr.json", rapid=None, + extract=["--docling", "results/genreport/edexcel1f/ocr.json", "--board", "edexcel", + "--marks-fill", "results/genreport/edexcel1f/marks_fill.json"]), + dict(slug="ocr-physics-h556-ocr", title="OCR A-level Physics H556/3 (rasterised OCR)", + board="ocr", level="A-level", path="OCR + gemma marks gap-fill", + pdf="samples/extra/ocr-alevel-physics-h556-3-jun22-qp.pdf", + docling="results/genreport/ocrh556/ocr.json", rapid=None, + gt="results/gt_extra/ocr-alevel-physics-h556-3-jun22-qp.txt", + extract=["--docling", "results/genreport/ocrh556/ocr.json", "--board", "ocr", + "--marks-fill", "results/genreport/ocrh556/marks_fill.json"]), +] +FAST = [ + dict(slug="aqa-physics-7408-fast", title="AQA A-level Physics 7408/1 (born-digital)", board="aqa", + level="A-level", pdf="samples/extra/aqa-alevel-physics-7408-1-jun22-qp.pdf", + gt="results/gt_extra/aqa-alevel-physics-7408-1-jun22-qp.txt"), + dict(slug="aqa-biology-8461-fast", title="AQA GCSE Biology 8461/1H (born-digital)", board="aqa", + level="GCSE", pdf="samples/extra/aqa-gcse-biology-8461-1h-jun22-qp.pdf", + gt="results/gt_extra/aqa-gcse-biology-8461-1h-jun22-qp.txt"), + dict(slug="edexcel-maths-1ma1-1h-fast", title="Edexcel GCSE Maths 1MA1/1H (born-digital)", + board="edexcel", level="GCSE-H", pdf="samples/extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.pdf", + gt="results/gt_extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.txt"), + dict(slug="edexcel-maths-1ma1-1f-fast", title="Edexcel GCSE Maths 1MA1/1F (born-digital)", + board="edexcel", level="GCSE-F", pdf="samples/extra/edexcel-gcse-maths-1ma1-1f-jun22-qp.pdf"), + dict(slug="ocr-physics-h556-fast", title="OCR A-level Physics H556/3 (born-digital)", board="ocr", + level="A-level", pdf="samples/extra/ocr-alevel-physics-h556-3-jun22-qp.pdf", + gt="results/gt_extra/ocr-alevel-physics-h556-3-jun22-qp.txt"), + dict(slug="aqa-chemistry-8462-fast", title="AQA GCSE Chemistry 8462/1H (born-digital)", board="aqa", + level="GCSE", pdf="samples/chemistry-p1h-2023-qp.pdf"), + dict(slug="aqa-physics-8463-twin-fast", title="AQA GCSE Physics 8463/1H born-digital twin", + board="aqa", level="GCSE", pdf="samples/physics-p1h-2022-qp.pdf"), +] + + +def run(cmd): + r = subprocess.run([PY] + cmd, capture_output=True, text=True) + if r.returncode != 0: + print(f" ! FAILED: {' '.join(cmd)}\n{r.stderr[-400:]}") + return r.returncode == 0 + + +def jload(p): + try: + return json.load(open(p)) + except Exception: + return {} + + +def stats_from(struct, val): + st = struct.get("stats", {}) or {} + mc = st.get("marks_check") or {} + cov = struct.get("coverage", {}) or {} + return { + "board": struct.get("board"), "paper_code": struct.get("paper_code"), + "n_questions": st.get("n_questions"), "n_parts": st.get("n_parts"), + "marks_sum": mc.get("sum"), "official_max": mc.get("expected_max"), + "marks_pct": mc.get("pct"), + "coverage_pct": cov.get("coverage_pct"), "coverage_missed": cov.get("missed", []), + "validate_verdict": (val.get("summary") or {}).get("worst_severity"), + "validate_flags": val.get("flags", []), + "questions_expected": (val.get("summary") or {}).get("questions_expected"), + "questions_recovered": (val.get("summary") or {}).get("questions_recovered"), + "second_pass_slots": [q["label"] for q in val.get("question_sequence", []) if not q["recovered"]], + } + + +def do_geometry(p, overlays): + d = os.path.join(FINAL, p["slug"]); os.makedirs(d, exist_ok=True) + S, F, B, R, T, V = (os.path.join(d, f) for f in + ("structured.json", "furniture.json", "bands.json", "page_roles.json", + "template.json", "validate.json")) + ex = ["extract.py"] + p["extract"] + ["--out", S] + if p.get("gt"): + ex += ["--gt", p["gt"]] + run(ex) + run(["furniture.py", p["docling"], "--out", F]) + bands = ["bands.py", S, "--docling", p["docling"], "--out", B] + if p.get("rapid"): + bands += ["--rapid", p["rapid"]] + run(bands) + run(["page_roles.py", p["docling"], "--bands", B, "--out", R]) + run(["template.py", "--structured", S, "--bands", B, "--furniture", F, + "--page-roles", R, "--pdf", p["pdf"], "--out", T]) + run(["validate.py", S, "--out", V]) + if overlays: + otpl = os.path.join(d, "overlays", "template") + run(["scripts/overlay.py", S, p["pdf"], "--template", T, "--dpi", "120", "--out", otpl]) + # rich debug view on the first few pages (cover + early questions) + odbg = os.path.join(d, "overlays", "debug") + run(["scripts/overlay.py", S, p["pdf"], "--docling", p["docling"], "--bands", B, + "--furniture", F, "--pages", "1,2,3,4,5", "--dpi", "120", "--out", odbg]) + return stats_from(jload(S), jload(V)), d + + +def do_fast(p): + d = os.path.join(FINAL, p["slug"]); os.makedirs(d, exist_ok=True) + S = os.path.join(d, "structured.json"); V = os.path.join(d, "validate.json") + ex = ["extract.py", "--text", p["pdf"], "--out", S] + if p.get("gt"): + ex += ["--gt", p["gt"]] + run(ex) + run(["validate.py", S, "--out", V]) + return stats_from(jload(S), jload(V)), d + + +def per_paper_report(p, s, d, kind): + n_imgs = len(glob.glob(os.path.join(d, "overlays", "**", "*.png"), recursive=True)) + lines = [f"# {p['title']}", "", + f"- **slug:** `{p['slug']}` · **board:** {p['board']} · **level:** {p['level']} " + f"· **path:** {kind}", + f"- **questions/parts:** {s['n_questions']} / {s['n_parts']}", + f"- **marks:** {s['marks_sum']}/{s['official_max']}" + + (f" ({s['marks_pct']}% of official max)" if s['marks_pct'] is not None else ""), + f"- **coverage vs GT:** {s['coverage_pct']}%" + + (f" (missed {s['coverage_missed'][:8]})" if s.get('coverage_missed') else "") + if s['coverage_pct'] is not None else "- **coverage vs GT:** n/a", + f"- **G6 verdict:** {s['validate_verdict']}", + ] + if s["validate_flags"]: + lines += ["", "**Flags (human-review hints):**"] + [f"- {f}" for f in s["validate_flags"]] + lines += ["", "**Artifacts:** `structured.json`, `validate.json`" + + (", `furniture.json`, `bands.json`, `page_roles.json`, `template.json`, " + f"`overlays/` ({n_imgs} images)" if kind != "born-digital fast-path" + else " (born-digital: no page geometry → no overlays)")] + open(os.path.join(d, "report.md"), "w").write("\n".join(lines) + "\n") + return n_imgs + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--no-overlays", action="store_true") + a = ap.parse_args() + os.makedirs(FINAL, exist_ok=True) + catalog = {"generated_at": datetime.datetime.now().isoformat(timespec="seconds"), + "papers": []} + total_imgs = 0 + + for p in GEOMETRY: + print(f"[geometry] {p['slug']}") + s, d = do_geometry(p, not a.no_overlays) + n = per_paper_report(p, s, d, p["path"]) + total_imgs += n + catalog["papers"].append({**{k: p[k] for k in ("slug", "title", "board", "level")}, + "kind": "geometry", "path": p["path"], "dir": d, + "overlay_images": n, **s}) + for p in FAST: + print(f"[fast] {p['slug']}") + s, d = do_fast(p) + per_paper_report(p, s, d, "born-digital fast-path") + catalog["papers"].append({**{k: p[k] for k in ("slug", "title", "board", "level")}, + "kind": "fast", "path": "born-digital fast-path", "dir": d, **s}) + + json.dump(catalog, open(os.path.join(FINAL, "catalog.json"), "w"), indent=2) + write_index(catalog, total_imgs) + print(f"\n-> {len(catalog['papers'])} papers, {total_imgs} overlay images -> {FINAL}/") + + +def write_index(catalog, total_imgs): + g = [p for p in catalog["papers"] if p["kind"] == "geometry"] + f = [p for p in catalog["papers"] if p["kind"] == "fast"] + L = ["# Final corpus output — exam-extraction spike", "", + f"Generated {catalog['generated_at']}. {len(catalog['papers'])} paper-runs across " + f"3 boards × 2 levels, both pipeline paths; {total_imgs} overlay debug images.", "", + "Each `/` holds the machine artifacts (JSON) + `report.md`; geometry papers also have " + "`overlays/template/` (human-review view, all pages) and `overlays/debug/` (raw-detection view).", + "Machine catalog: `catalog.json`.", "", + "## Image-only / OCR-path (with geometry + overlays)", "", + "| Paper | Board / level | Q/parts | Marks/max | Coverage | G6 | Images |", + "|---|---|---|---|---|---|---|"] + for p in g: + cov = f"{p['coverage_pct']}%" if p['coverage_pct'] is not None else "n/a" + L.append(f"| [{p['title']}]({p['slug']}/report.md) | {p['board']} {p['level']} | " + f"{p['n_questions']}/{p['n_parts']} | {p['marks_sum']}/{p['official_max']} " + f"({p['marks_pct']}%) | {cov} | {p['validate_verdict']} | " + f"{p['overlay_images']} |") + L += ["", "## Born-digital fast-path (CPU, no geometry)", "", + "| Paper | Board / level | Q/parts | Marks/max | Coverage | G6 |", + "|---|---|---|---|---|---|"] + for p in f: + L.append(f"| [{p['title']}]({p['slug']}/report.md) | {p['board']} {p['level']} | " + f"{p['n_questions']}/{p['n_parts']} | {p['marks_sum']}/{p['official_max']} " + f"({p['marks_pct']}%) | {p['coverage_pct'] if p['coverage_pct'] is not None else 'n/a'}% | " + f"{p['validate_verdict']} |") + L += ["", "## Per-paper directory layout", "```", + "/", + " structured.json extract.py output (questions->parts->marks/bbox/regions)", + " validate.json G6 consistency judge (confidence + flags)", + " furniture.json recurring-furniture mask + content margins [geometry only]", + " bands.json main + part y-bands [geometry only]", + " page_roles.json per-page role + margin override [geometry only]", + " template.json editable first-pass template (source/confirmed) [geometry only]", + " overlays/template/ human-review view, all pages [geometry only]", + " overlays/debug/ raw-detection view, sample pages [geometry only]", + " report.md per-paper human summary", "```"] + open(os.path.join(FINAL, "INDEX.md"), "w").write("\n".join(L) + "\n") + + +if __name__ == "__main__": + main() diff --git a/api/services/docling/furniture.py b/api/services/docling/furniture.py new file mode 100644 index 0000000..443f899 --- /dev/null +++ b/api/services/docling/furniture.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +""" +furniture.py — detect recurring page chrome by cross-page repetition; derive content margins; +reclassify pictures (real figure vs barcode/QR/header furniture). The first-pass mask. + +Principle: an item at ~the same (x,y) on many pages is **chrome, not question content**. This +needs no classifier — pure positional recurrence — and it solves the genuine gap the overlay +surfaced (the app-generated QR top-right and the foot barcode being mislabelled context_figure), +including the QR that bleeds past the margin. It also yields the content margins so stage-2 analysis +can be fed only the question/response region. + +Outputs a mask + margins JSON, and an A/B summary (figure false-positives before vs after masking). + +Usage: + python furniture.py [--freq 0.4] [--out results/furniture.json] +""" +import json, argparse +from collections import defaultdict + +GRID = 24 # pt — position quantisation; items sharing a cell across pages are "recurring" + + +def gather(doc): + out = [] + for key in ("texts", "pictures", "tables"): + for it in doc.get(key, []): + prov = it.get("prov") or [] + bb = prov[0].get("bbox") if prov else None + pg = prov[0].get("page_no") if prov else None + if bb and pg: + out.append({"page": pg, "kind": key[:-1], "label": it.get("label", key[:-1]), + "bbox": bb, "text": (it.get("text") or "")[:40]}) + return out + + +def cell(bb): + return (round((bb["l"] + bb["r"]) / 2 / GRID), round((bb["t"] + bb["b"]) / 2 / GRID)) + + +def detect(items, n_pages, freq): + """Flag each item furniture=True if its position-cell appears on >= freq*n_pages pages.""" + pages_at = defaultdict(set) + for it in items: + pages_at[cell(it["bbox"])].add(it["page"]) + fcells = {c: len(p) for c, p in pages_at.items() if len(p) >= freq * n_pages} + for it in items: + it["furniture"] = cell(it["bbox"]) in fcells + return fcells + + +def content_margins(items): + """Content x-band + per-page content bbox from NON-furniture items (what stage-2 should see).""" + body = [it for it in items if not it["furniture"]] + if not body: + return None + lefts = sorted(it["bbox"]["l"] for it in body) + rights = sorted(it["bbox"]["r"] for it in body) + band = {"x_left": round(lefts[max(0, len(lefts) // 20)], 1), # 5th pct — robust to strays + "x_right": round(rights[min(len(rights) - 1, len(rights) * 19 // 20)], 1)} + per_page = {} + bp = defaultdict(list) + for it in body: + bp[it["page"]].append(it["bbox"]) + for pg, bbs in bp.items(): + per_page[pg] = {"top": round(max(b["t"] for b in bbs), 1), + "bottom": round(min(b["b"] for b in bbs), 1), + "left": round(min(b["l"] for b in bbs), 1), + "right": round(max(b["r"] for b in bbs), 1)} + return {"content_x_band": band, "per_page": per_page} + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("doc") + ap.add_argument("--freq", type=float, default=0.40, help="recurrence fraction => furniture") + ap.add_argument("--out", default="results/furniture.json") + a = ap.parse_args() + doc = json.load(open(a.doc)) + items = gather(doc) + n_pages = len({it["page"] for it in items}) + fcells = detect(items, n_pages, a.freq) + margins = content_margins(items) + + pics = [it for it in items if it["kind"] == "picture"] + pics_furn = [it for it in pics if it["furniture"]] + txt_furn = [it for it in items if it["kind"] == "text" and it["furniture"]] + # break furniture pictures down by cell (which recurring object) + by_cell = defaultdict(list) + for it in pics_furn: + by_cell[cell(it["bbox"])].append(it) + + result = { + "n_pages": n_pages, "freq_threshold": a.freq, + "furniture_cells": {f"{c[0]},{c[1]}": n for c, n in sorted(fcells.items())}, + "content_margins": margins, + "ab_test_figures": { + "context_figure_before_mask": len(pics), + "context_figure_after_mask": len(pics) - len(pics_furn), + "removed_as_furniture": len(pics_furn), + "removed_breakdown": {f"cell {c[0]},{c[1]}": len(v) for c, v in sorted(by_cell.items())}, + }, + "text_furniture_removed": len(txt_furn), + "items": items, # each carries furniture flag — consumed by overlay.py --furniture + } + json.dump(result, open(a.out, "w")) + + ab = result["ab_test_figures"] + print(f"pages {n_pages} freq>={a.freq} furniture cells: {result['furniture_cells']}") + print(f"content x-band: {margins['content_x_band'] if margins else None}") + print(f"\nA/B — figure (picture) classification:") + print(f" context_figure BEFORE mask : {ab['context_figure_before_mask']}") + print(f" context_figure AFTER mask : {ab['context_figure_after_mask']}") + print(f" removed as furniture : {ab['removed_as_furniture']} {ab['removed_breakdown']}") + print(f" text furniture removed : {result['text_furniture_removed']} (page numbers / 'Turn over' / headers)") + print(f"-> wrote {a.out}") + + +if __name__ == "__main__": + main() diff --git a/api/services/docling/page_roles.py b/api/services/docling/page_roles.py new file mode 100644 index 0000000..62b38e8 --- /dev/null +++ b/api/services/docling/page_roles.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +page_roles.py — tag every page with a structural role (the first-pass page-layout pass). + +Roles: cover / question / continuation / blank / appendix. Drives two things in the template: + * the human sees the paper's shape (which pages are non-question), and + * MARGINS are disabled on pages that have no content column (cover, blank) — the override the + user asked for ("the front page doesn't have margins"). + +Signals (deterministic, no GPU): per-page non-space char count, cover/boilerplate keywords, and +whether the page carries a question band. Output feeds template.py via --page-roles. + +Usage: + python page_roles.py --bands [--out results/page_roles/x.json] +""" +import json, argparse +from collections import defaultdict + +BLANK_MAX = 130 # non-space chars at/below which a page is boilerplate-only (blank) +COVER_KW = ("time allowed", "instructions", "materials", "information for") +BLANK_KW = ("blank page", "no questions printed", "no questions are printed") +APPENDIX_KW = ("data sheet", "formula", "periodic table", "insert", "resource booklet") + +# pages where there is no content column -> margins do not apply (the user's override case) +NO_MARGIN_ROLES = {"cover", "blank"} + + +def page_text(doc): + chars, blob = defaultdict(int), defaultdict(list) + for t in doc.get("texts", []): + prov = t.get("prov") or [] + pg = prov[0].get("page_no") if prov else None + if pg: + s = t.get("text") or "" + chars[pg] += sum(1 for c in s if not c.isspace()) + blob[pg].append(s.lower()) + return chars, {pg: " ".join(v) for pg, v in blob.items()} + + +def tag(doc, qpages): + chars, blob = page_text(doc) + n = max([*chars, *qpages, 1]) + first_q = min(qpages) if qpages else n + 1 + last_q = max(qpages) if qpages else 0 + roles = {} + for pg in range(1, n + 1): + b = blob.get(pg, "") + if pg in qpages: + role = "question" + elif pg < first_q and any(k in b for k in COVER_KW): + role = "cover" # before blank: the cover's instructions mention "blank" + elif chars[pg] <= BLANK_MAX or (any(k in b for k in BLANK_KW) and chars[pg] < 300): + role = "blank" + elif any(k in b for k in APPENDIX_KW): + role = "appendix" + elif first_q <= pg <= last_q: + role = "continuation" # no question label but inside the question range + else: + role = "appendix" # content outside the question range (end-matter/insert) + roles[pg] = {"role": role, "chars": chars[pg], + "margins_enabled": role not in NO_MARGIN_ROLES, + "source": "auto", "confirmed": False} + return roles + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("doc") + ap.add_argument("--bands", required=True) + ap.add_argument("--out", default="results/page_roles.json") + a = ap.parse_args() + bands = json.load(open(a.bands)) + qpages = {int(p) for p in bands["pages"]} + roles = tag(json.load(open(a.doc)), qpages) + json.dump({"pages": roles}, open(a.out, "w"), indent=2) + from collections import Counter + c = Counter(v["role"] for v in roles.values()) + print(f"roles: {dict(c)}") + for pg in sorted(roles): + r = roles[pg] + flag = "" if r["margins_enabled"] else " (no margins)" + if r["role"] != "question": + print(f" p{pg:2d}: {r['role']:12s} chars={r['chars']}{flag}") + print(f"-> wrote {a.out}") + + +if __name__ == "__main__": + main() diff --git a/api/services/docling/scripts/__init__.py b/api/services/docling/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/services/docling/scripts/overlay.py b/api/services/docling/scripts/overlay.py new file mode 100644 index 0000000..76b8a0a --- /dev/null +++ b/api/services/docling/scripts/overlay.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +""" +overlay.py — human-viewable debug visualisation: draw the extractor's geometry over the rendered +exam page. Shows WHERE each question/part label was located and where Docling regions +(figures/tables/MCQ checkboxes) sit, so a reviewer can eyeball whether the structure landed in the +right place. This is the same geometry the exam-marker app uses to place regions on its canvas. + +Coordinates: Docling/RapidOCR bboxes are PDF points with a BOTTOM-LEFT origin. We render the page +at DPI D (scale = D/72) and flip y against the rendered image height, so we never need the page's +point-height explicitly: y_top_px = H_px - t*scale. + +With --docling, also draws every raw Docling text block (the body/question content the thin +extractor model discards) so a reviewer can see the FULL detection, not just what we persist. +Granite tables carry cells but no coordinates; we derive their box by locating the cell-texts in +the Docling text layer (content+geometry fusion). + +Usage: + python scripts/overlay.py [--pages 3,4,5] [--dpi 150] [--out DIR] + python scripts/overlay.py --docling results/E_tess_full.json --pages 5 +""" +import os, sys, json, re, argparse, subprocess, tempfile +from PIL import Image, ImageDraw, ImageFont + +PART_COLOR = (211, 47, 47) # red — question/part labels +BODY_COLOR = (150, 150, 150) # grey — raw Docling body-text blocks (--docling) +GRANITE_COLOR = (0, 150, 136) # teal — Granite table (geometry derived from cells) +REGION_COLORS = { # docling region taxonomy -> colour + "context_figure": (25, 118, 210), # blue + "context_data": (56, 142, 60), # green (tables) + "context_caption": (123, 31, 162), # purple + "mcq_option": (245, 124, 0), # orange (checkboxes) +} + + +def _norm(s): + return re.sub(r"[^a-z0-9]", "", (s or "").lower()) + + +def docling_texts_by_page(doc): + """All raw Docling text items -> {page: [(bbox, text, label)]}. The body content we discard.""" + out = {} + for t in doc.get("texts", []): + prov = t.get("prov") or [] + bb = prov[0].get("bbox") if prov else None + pg = prov[0].get("page_no") if prov else None + if bb and pg: + out.setdefault(pg, []).append((bb, t.get("text") or "", t.get("label") or "text")) + return out + + +def derive_table_bbox(grid, page_texts): + """Granite tables have cells but no coordinates. Locate the cell-texts in the Docling text + layer and union their bboxes -> the table's on-page extent. + + Two traps (seen on physics p5): (1) border/maths glyphs ('|','+') normalise to '' and an + empty string is a substring of everything; (2) cell WORDS recur in nearby content — the rock + names reappear in the MCQ options below the table ('Basalt or chalk'), far left and lower. + So we match only blocks whose normalised text is CONTAINED IN a cell (keeps fragments like + '2.90'/'Type', rejects the longer 'basaltorchalk'), require length >= 2, then keep the + dominant vertical cluster to drop any stray cell-word elsewhere on the page.""" + import statistics + cells = {c for c in (_norm(x) for row in grid for x in row) if len(c) > 1} + hit = [bb for bb, txt, _ in page_texts + if len(_norm(txt)) > 1 and any(_norm(txt) in c for c in cells)] + if len(hit) < 3: + return None + med = statistics.median(sorted((b["t"] + b["b"]) / 2 for b in hit)) + hit = [b for b in hit if abs((b["t"] + b["b"]) / 2 - med) <= 120] # table band only + return {"l": min(b["l"] for b in hit), "r": max(b["r"] for b in hit), + "t": max(b["t"] for b in hit), "b": min(b["b"] for b in hit)} + + +def _font(sz): + for p in ("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", + "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"): + if os.path.exists(p): + return ImageFont.truetype(p, sz) + return ImageFont.load_default() + + +MAIN_LINE = (25, 118, 210) # blue — main-question y-markers +PART_LINE = (211, 47, 47) # red — part y-markers + + +def _hline(draw, y_pdf, scale, H, W, color, label, width, font, dashed=False, inset=0): + """Full-width horizontal marker line at a PDF-point y (BOTTOM-LEFT origin).""" + y = H - y_pdf * scale + if dashed: + x = inset + while x < W: + draw.line([x, y, min(x + 9, W), y], fill=color, width=width); x += 16 + else: + draw.line([inset, y, W, y], fill=color, width=width) + if label: + tw = draw.textlength(label, font=font) + draw.rectangle([inset, y - 16, inset + tw + 6, y], fill=color) + draw.text((inset + 3, y - 15), label, fill=(255, 255, 255), font=font) + + +def _rect(draw, bb, scale, H, color, label, width=3, font=None): + """Draw one bbox (BOTTOM-LEFT origin -> image space) + its label.""" + x0, x1 = bb["l"] * scale, bb["r"] * scale + y0, y1 = H - bb["t"] * scale, H - bb["b"] * scale # t is the higher edge -> smaller y_px + draw.rectangle([x0, y0, x1, y1], outline=color, width=width) + if label: + tw = draw.textlength(label, font=font) + draw.rectangle([x0, y0 - 17, x0 + tw + 6, y0], fill=color) + draw.text((x0 + 3, y0 - 16), label, fill=(255, 255, 255), font=font) + + +def draw_template(draw, tpl, pg, scale, H, W, font): + """Render the editable template for one page: margins/bands as LINES, footprints as BOXES. + A confirmed element is drawn solid; an unconfirmed (auto) suggestion is drawn dashed.""" + MARGIN, MAIN, PART = (0, 150, 136), (25, 118, 210), (211, 47, 47) + page = tpl["pages"].get(str(pg)) or tpl["pages"].get(pg) or {} + # role banner (top-left); margins suppressed entirely on no-margin pages (cover/blank) + role = page.get("role", "question") + draw.rectangle([0, 0, 8 + len(role) * 8, 16], fill=(70, 70, 70)) + draw.text((4, 1), f"role:{role}", fill=(255, 255, 255), font=font) + margins_on = page.get("margins_enabled", True) + # margins: axis-locked lines (document scope on every page + this page's page-scope lines) + for m in (tpl.get("margins", []) if margins_on else []): + if m["scope"] == "page" and m.get("page") != pg: + continue + solid = m.get("confirmed") + if m["axis"] == "x": + x = m["value"] * scale + draw.line([x, 0, x, H], fill=MARGIN, width=2) if solid else _dash_v(draw, x, 0, H, MARGIN, 2) + else: + y = H - m["value"] * scale + draw.line([0, y, W, y], fill=MARGIN, width=2) if solid else _dash_h(draw, 0, W, y, MARGIN, 2) + for m in page.get("main_bands", []): + if not m.get("is_start", True): # continuation page: no spurious second "start" line + continue + _hline(draw, m["y_start"], scale, H, W, MAIN, f"Q{m['question']}", 3, font, + dashed=not m.get("confirmed")) + for p in page.get("part_bands", []): + _hline(draw, p["y_start"], scale, H, W, PART, p["label"], 2, font, inset=90, + dashed=not p.get("confirmed")) + for f in page.get("furniture", []): + if f.get("box"): + _rect(draw, f["box"], scale, H, (130, 130, 130), f"furniture:{f.get('kind','')}", 2, font) + for g in page.get("figures", []): + if g.get("box"): + _rect(draw, g["box"], scale, H, (56, 142, 60), "figure", 3, font) + for t in page.get("tables", []): + if t.get("box"): + _rect(draw, t["box"], scale, H, (0, 150, 136), + f"table {t.get('n_rows')}x{t.get('n_cols')}", 3, font) + + +def render_page(pdf, pg, dpi, td): + """Render page `pg` and return an image in DOCLING's coordinate space. Docling reports bbox + relative to the CropBox, but pdftoppm renders the MediaBox — when CropBox != MediaBox (e.g. the + Edexcel 1MA1 papers: media 652x899, crop inset 28.35pt) that mismatch magnifies + shifts every + overlaid shape toward a corner. Fix: crop the rendered image to the CropBox so it matches Docling. + No-op when CropBox == MediaBox (h556) or when poppler already rendered the CropBox.""" + base = os.path.join(td, f"p{pg}") + subprocess.run(["pdftoppm", "-png", "-r", str(dpi), "-f", str(pg), "-l", str(pg), pdf, base], + check=True) + png = next(p for p in (f"{base}-{pg:02d}.png", f"{base}-{pg}.png", f"{base}-{pg:03d}.png") + if os.path.exists(p)) + img = Image.open(png).convert("RGB") + try: + import pypdf + page = pypdf.PdfReader(pdf).pages[pg - 1] + mb, cb = page.mediabox, page.cropbox + scale = dpi / 72.0 + mbl, mbt = float(mb.left), float(mb.top) + dcrop = any(abs(a - b) > 0.5 for a, b in + ((cb.left, mb.left), (cb.bottom, mb.bottom), (cb.right, mb.right), (cb.top, mb.top))) + rendered_mediabox = abs(img.width - (float(mb.right) - mbl) * scale) < 3 + if dcrop and rendered_mediabox: + img = img.crop((round((float(cb.left) - mbl) * scale), round((mbt - float(cb.top)) * scale), + round((float(cb.right) - mbl) * scale), round((mbt - float(cb.bottom)) * scale))) + except Exception: + pass + return img + + +def _dash_v(draw, x, y0, y1, color, w): + y = y0 + while y < y1: + draw.line([x, y, x, min(y + 9, y1)], fill=color, width=w); y += 16 + + +def _dash_h(draw, x0, x1, y, color, w): + x = x0 + while x < x1: + draw.line([x, y, min(x + 9, x1), y], fill=color, width=w); x += 16 + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("structured"); ap.add_argument("pdf") + ap.add_argument("--docling", help="raw Docling doc JSON: also draw every body-text block " + "(the content the thin model discards) + derive Granite-table boxes") + ap.add_argument("--bands", help="bands.py JSON: draw main-question + part start/end y-marker lines") + ap.add_argument("--furniture", help="furniture.py JSON: mark recurring furniture vs real figures " + "+ draw the content x-margins") + ap.add_argument("--template", help="template.py JSON: render the editable first-pass template " + "(margins+bands as lines, furniture/figures as boxes). " + "When set, draws ONLY the template (the human-review view).") + ap.add_argument("--pages", help="comma list, e.g. 3,4,5 (default: all pages with geometry)") + ap.add_argument("--dpi", type=int, default=150) + ap.add_argument("--out", default="results/overlay") + a = ap.parse_args() + os.makedirs(a.out, exist_ok=True) + scale = a.dpi / 72.0 + font = _font(14) + + res = json.load(open(a.structured)) + doc_texts = docling_texts_by_page(json.load(open(a.docling))) if a.docling else {} + bands = json.load(open(a.bands))["pages"] if a.bands else {} + furn = json.load(open(a.furniture)) if a.furniture else None + tpl = json.load(open(a.template)) if a.template else None + # gather geometry by page + parts_by_pg, regions_by_pg = {}, {} + for q in res.get("questions", []): + for p in q["parts"]: + if p.get("bbox") and p.get("page"): + parts_by_pg.setdefault(p["page"], []).append((p["label"], p["bbox"])) + for r in res.get("regions", []): + if r.get("bbox") and r.get("page"): + regions_by_pg.setdefault(r["page"], []).append((r["type"], r["bbox"])) + # tables: standard ones carry a bbox; Granite ones don't -> derive from the text layer + tables_by_pg = {} + for t in res.get("tables", []): + pg = t.get("page") + if not pg: + continue + bb = t.get("bbox") or (derive_table_bbox(t.get("grid", []), doc_texts.get(pg, [])) + if a.docling else None) + if bb: + tables_by_pg.setdefault(pg, []).append( + (f"table {t.get('source','')} {t.get('n_rows')}x{t.get('n_cols')}", bb)) + + want = ([int(x) for x in a.pages.split(",")] if a.pages + else (sorted(int(p) for p in tpl["pages"]) if tpl + else sorted(set(parts_by_pg) | set(regions_by_pg) | set(doc_texts)))) + if not want: + sys.exit("no bbox geometry in this result (born-digital text path carries no geometry; " + "use an OCR/rapid-path structured.json)") + + written = [] + with tempfile.TemporaryDirectory() as td: + for pg in want: + img = render_page(a.pdf, pg, a.dpi, td) + H = img.height + draw = ImageDraw.Draw(img) + if tpl: # template-only render = the human-review view + draw_template(draw, tpl, pg, scale, H, img.width, font) + out = os.path.join(a.out, f"p{pg:02d}.png") + img.save(out); written.append(out) + pgd = tpl["pages"].get(str(pg), {}) + print(f"p{pg}: template — {len(pgd.get('main_bands',[]))} main, " + f"{len(pgd.get('part_bands',[]))} part, {len(pgd.get('furniture',[]))} furn, " + f"{len(pgd.get('figures',[]))} fig -> {out}") + continue + # layer 0: raw Docling body-text blocks (faint, no label) — the discarded content + for bb, txt, lab in doc_texts.get(pg, []): + _rect(draw, bb, scale, H, BODY_COLOR, None, 1, font) + # layer 1: taxonomy regions + for typ, bb in regions_by_pg.get(pg, []): + _rect(draw, bb, scale, H, REGION_COLORS.get(typ, (120, 120, 120)), typ, 2, font) + # layer 2: tables (Granite-derived boxes in teal) + for lab, bb in tables_by_pg.get(pg, []): + _rect(draw, bb, scale, H, GRANITE_COLOR, lab, 3, font) + # layer 3: part labels on top + for lab, bb in parts_by_pg.get(pg, []): + _rect(draw, bb, scale, H, PART_COLOR, lab, 3, font) + # layer 4: band y-marker lines (main-question = blue, part = red dashed; end = dashed) + pb = bands.get(str(pg)) or bands.get(pg) + nb = 0 + if pb: + W = img.width + for m in pb["main"]: + if not m.get("is_start", True): # skip continuation-page duplicate + continue + _hline(draw, m["y_start"], scale, H, W, MAIN_LINE, + f"Q{m['question']} ▸ start", 3, font); nb += 1 + _hline(draw, m["y_end"], scale, H, W, MAIN_LINE, None, 1, font, dashed=True) + for p in pb["part"]: + _hline(draw, p["y_start"], scale, H, W, PART_LINE, + f"{p['label']} start", 2, font, inset=90); nb += 1 + # layer 5: furniture mask — green=real figure, grey=masked furniture; + content margins + if furn: + W = img.width + for it in furn["items"]: + if it["page"] != pg or it["kind"] != "picture": + continue + if it["furniture"]: + _rect(draw, it["bbox"], scale, H, (130, 130, 130), "furniture", 2, font) + else: + _rect(draw, it["bbox"], scale, H, (56, 142, 60), "figure ✓", 3, font) + band = (furn.get("content_margins") or {}).get("content_x_band") + if band: + for xk in ("x_left", "x_right"): + x = band[xk] * scale + draw.line([x, 0, x, H], fill=(0, 150, 136), width=2) + out = os.path.join(a.out, f"p{pg:02d}.png") + img.save(out); written.append(out) + print(f"p{pg}: {len(parts_by_pg.get(pg,[]))} part-labels, " + f"{len(regions_by_pg.get(pg,[]))} regions, {len(tables_by_pg.get(pg,[]))} tables, " + f"{len(doc_texts.get(pg,[]))} body-text blocks, {nb} band-lines -> {out}") + print(f"-> {len(written)} page(s) in {a.out}/") + + +if __name__ == "__main__": + main() diff --git a/api/services/docling/tables.py b/api/services/docling/tables.py new file mode 100644 index 0000000..4df0152 --- /dev/null +++ b/api/services/docling/tables.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +tables.py — selective table-cell extraction for the exam extractor (PLAN.md §B). + +Two sources, unified into one cell-grid schema: + * STANDARD — the Tesseract+TableFormer backbone already emits `tables[].data.table_cells` + (text + row/col offsets + spans + bbox). Free, cached, every run. Good on ruled tables; + but it MISSES some data tables and OCRs them as loose tokens (REPORT.md p5). + * GRANITE — Granite-Docling-258M VLM emits `` grids in DocTags (clean rows/cols even + where the backbone scrambles them). GPU cost, so used SELECTIVELY: only on pages the router + flags (a standard table present, or dense picture/checkbox), routed through dsync's GPU lock + + Redis cache. Recipe (REPORT.md): {"to_formats":["doctags","json"], "pipeline":"vlm", + "vlm_pipeline_model":"granite_docling"}. + +Unified table = {page, n_rows, n_cols, grid (2D text), cells, caption, source, is_furniture}. +""" +import re, json, os, glob, base64, urllib.request + +# ----------------------------------------------------------------- OTSL (Granite DocTags) +OTSL_BLOCK = re.compile(r"(.*?)", re.S) +CAPTION = re.compile(r"(?:)*(.*?)", re.S) +CELL_TOK = re.compile(r"<(fcel|ecel|ched|rhed|lcel|ucel|xcel|nl)>([^<]*)") +HEADER_TAGS = {"ched", "rhed"} + + +def parse_otsl(doctags): + """Parse every block in a DocTags string into unified tables.""" + out = [] + for block in OTSL_BLOCK.findall(doctags): + cap = None + mc = CAPTION.search(block) + if mc: + cap = re.sub(r"\s+", " ", mc.group(1)).strip() + body = CAPTION.sub("", block) + body = re.sub(r"", "", body) + rows, cur = [], [] + for tag, txt in CELL_TOK.findall(body): + if tag == "nl": + rows.append(cur); cur = [] + else: + cur.append({"text": txt.strip(), "header": tag in HEADER_TAGS, + "empty": tag == "ecel"}) + if cur: + rows.append(cur) + rows = [r for r in rows if r] + if not rows: + continue + n_cols = max(len(r) for r in rows) + grid = [[c["text"] for c in r] + [""] * (n_cols - len(r)) for r in rows] + out.append({"page": None, "n_rows": len(rows), "n_cols": n_cols, "grid": grid, + "caption": cap, "source": "granite-otsl", + "is_furniture": is_furniture(grid, cap)}) + return out + + +# ----------------------------------------------------------------- standard TableFormer +def tables_from_standard(doc): + out = [] + for t in doc.get("tables", []): + data = t.get("data", {}) or {} + cells = data.get("table_cells", []) or [] + nr, nc = data.get("num_rows") or 0, data.get("num_cols") or 0 + grid = [["" for _ in range(nc)] for _ in range(nr)] + for c in cells: + r0, c0 = c.get("start_row_offset_idx"), c.get("start_col_offset_idx") + if r0 is not None and c0 is not None and r0 < nr and c0 < nc and c.get("text"): + grid[r0][c0] = c["text"] + prov = t.get("prov") or [] + page = prov[0].get("page_no") if prov else None + cap = " ".join(x.get("text", "") for x in (t.get("captions") or []) if isinstance(x, dict)) or None + out.append({"page": page, "n_rows": nr, "n_cols": nc, "grid": grid, + "caption": cap, "source": "docling-standard", + "is_furniture": is_furniture(grid, cap)}) + return out + + +# ----------------------------------------------------------------- furniture filter +FURNITURE_RE = re.compile(r"examiner|do not write|leave\s+blank|question\s*mark|" + r"for marker|total marks?$", re.I) + + +def is_furniture(grid, caption=None): + """A table that is exam scaffolding (mark grid / 'For Examiner's Use'), not question data.""" + blob = " ".join(cell for row in grid for cell in row) + " " + (caption or "") + if FURNITURE_RE.search(blob): + return True + # a single-column strip of question numbers / blanks = a mark grid + flat = [c for row in grid for c in row if c.strip()] + if flat and all(re.fullmatch(r"\d{1,2}", c.strip()) for c in flat): + return True + return False + + +# ----------------------------------------------------------------- Granite via dsync +VLM_OPTS = {"to_formats": ["doctags", "json"], "pipeline": "vlm", + "vlm_pipeline_model": "granite_docling", "image_export_mode": "placeholder"} + + +def _serve_vlm(pdf_b64, fname, page): + import dsync + opts = {**VLM_OPTS, "page_range": [page, page]} + body = {"options": opts, + "sources": [{"kind": "file", "base64_string": pdf_b64, "filename": fname}], + "target": {"kind": "inbody"}} + req = urllib.request.Request(dsync.SERVE + "/v1/convert/source", + data=json.dumps(body).encode(), + headers={"Content-Type": "application/json"}) + for _ in range(4): # tolerate the single-use 404 race + try: + return json.loads(urllib.request.urlopen(req, timeout=1200).read()) + except urllib.error.HTTPError as e: + if e.code == 404: + import time; time.sleep(3); continue + raise + raise RuntimeError("serve vlm: repeated 404") + + +def _doctags_of(resp): + doc = resp.get("document") or {} + return doc.get("doctags_content") or doc.get("doc_tags") or doc.get("doctags") or "" + + +def granite_tables(pdf, pages, *, cached_glob=None, retries=4): + """Run Granite-Docling on the given pages via dsync (GPU lock + OOM retry + Redis cache), + parse , tag each table with its page. Falls back to cached *.doctags if serve fails.""" + import dsync, time + cache = _load_cached_doctags(cached_glob) if cached_glob else {} + r = dsync._redis() + b64 = base64.b64encode(open(pdf, "rb").read()).decode() + fname = os.path.basename(pdf) + sha = dsync._sha(pdf) + out = [] + for pg in pages: + key = f"docling:vlm:{sha}:p{pg}" + doctags = None + if r and (hit := r.get(key)): + doctags = hit if isinstance(hit, str) else hit.decode() + if doctags is None: + delay = 5 + for attempt in range(retries): + with dsync._GpuLock(r): + resp = _serve_vlm(b64, fname, pg) + if dsync._is_oom(resp): + print(f"[granite] p{pg} OOM, backoff {delay}s ({attempt+1}/{retries})") + time.sleep(delay); delay = min(delay * 2, 120); continue + doctags = _doctags_of(resp) + if r and doctags: + r.set(key, doctags, ex=dsync.CACHE_TTL) + break + if not doctags and pg in cache: + print(f"[granite] p{pg} serve empty -> cached doctags") + doctags = cache[pg] + for tbl in parse_otsl(doctags or ""): + tbl["page"] = pg + out.append(tbl) + return out + + +def _load_cached_doctags(glob_pat): + """Map page_no -> doctags text from files named *p.doctags.""" + cache = {} + for fn in glob.glob(glob_pat): + m = re.search(r"p(\d+)\.doctags$", fn) + if m: + cache[int(m.group(1))] = open(fn, encoding="utf-8", errors="replace").read() + return cache + + +# ----------------------------------------------------------------- routing + attach +def candidate_pages(doc): + """Pages the router sends to Granite: a standard table, or a dense picture/checkbox page.""" + pages = set() + for t in doc.get("tables", []): + prov = t.get("prov") or [] + if prov and prov[0].get("page_no"): + pages.add(prov[0]["page_no"]) + chk = {} + for it in doc.get("texts", []): + if it.get("label", "").startswith("checkbox"): + prov = it.get("prov") or [] + if prov and prov[0].get("page_no"): + chk[prov[0]["page_no"]] = chk.get(prov[0]["page_no"], 0) + 1 + pages |= {p for p, n in chk.items() if n >= 2} + return sorted(pages) + + +def attach_to_questions(tables, parts): + """Assign each non-furniture table to the nearest preceding part on its page (by y); if no + geometry, attach to the first part on that page. Records table refs on the part.""" + data_tables = [t for t in tables if not t["is_furniture"]] + by_page = {} + for lab, v in parts.items(): + by_page.setdefault(v.get("page"), []).append((lab, v)) + for i, t in enumerate(data_tables): + t["id"] = i + cands = by_page.get(t["page"], []) + if not cands: + t["for_part"] = None; continue + # best-effort: the part highest on the page (largest bbox top = the page's question stem), + # else the earliest part label. (Tables sit under the stem; we don't carry table y here.) + with_geo = [(lab, v) for lab, v in cands if v.get("bbox")] + if with_geo: + lab = max(with_geo, key=lambda kv: (kv[1]["bbox"] or {}).get("t", 0))[0] + else: + lab = sorted(cands, key=lambda kv: kv[0])[0][0] + t["for_part"] = lab + parts[lab].setdefault("tables", []).append( + {"id": i, "n_rows": t["n_rows"], "n_cols": t["n_cols"], + "caption": t["caption"], "source": t["source"]}) + return data_tables diff --git a/api/services/docling/template.py b/api/services/docling/template.py new file mode 100644 index 0000000..b2b3929 --- /dev/null +++ b/api/services/docling/template.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +""" +template.py — assemble the editable first-pass structural template from the spike's three signal +sources (extract structured.json + bands.json + furniture.json) into ONE round-trippable JSON the +human reviewer verifies AND edits before stage-2 generates the final template. + +UI principle (user, 2026-06-07): directional LIMITS are draggable LINES (1-DOF, easier to drag); +object FOOTPRINTS are BOXES. So: + * margins -> four axis-locked LINES: left/right (x), top/bottom (y) + * question/part bands -> horizontal LINES: start/end y + * furniture / figures / tables -> BOXES (an object's footprint) + +Every editable element carries {source: "auto"|"human", confirmed: bool} — the AI-suggestion seam. +Stage-2 must consume only confirmed elements (or a template marked confirmed at the top level). +Coordinates are PDF points, BOTTOM-LEFT origin (units in meta); the app maps to its own canvas. + +Usage: + python template.py --structured S.json --bands B.json --furniture F.json --pdf P.pdf --out T.json +""" +import json, argparse, datetime + + +def _line(edge, axis, value, scope, page=None): + o = {"edge": edge, "axis": axis, "value": round(value, 1), "scope": scope, + "source": "auto", "confirmed": False} + if page is not None: + o["page"] = page + return o + + +def _furn_kind(it): + """Best-guess label for a furniture box (human can rename). Position-based, BOTTOM-LEFT origin.""" + bb = it["bbox"]; cx = (bb["l"] + bb["r"]) / 2; cy = (bb["t"] + bb["b"]) / 2 + if it["kind"] == "picture": + if cx > 430 and cy > 700: + return "qr" + if cy < 110: + return "barcode" + return "chrome_picture" + if cy < 90: + return "footer" + if cy > 760: + return "header_or_page_number" + return "chrome_text" + + +def build(structured, bands, furniture, pdf=None, page_roles=None): + page_roles = page_roles or {} + part_bbox = {p["label"]: p.get("bbox") + for q in structured.get("questions", []) for p in q["parts"]} + cm = furniture.get("content_margins") or {} + xband = cm.get("content_x_band") or {} + per_pg_m = cm.get("per_page") or {} + + def margins_on(pg): + r = page_roles.get(str(pg)) or page_roles.get(pg) + return r.get("margins_enabled", True) if r else True + + # margins as axis-locked LINES — document-level left/right, per-page top/bottom. Per-page + # top/bottom are omitted for pages with no content column (cover/blank) — the user's override. + margins = [] + if "x_left" in xband: + margins.append(_line("left", "x", xband["x_left"], "document")) + margins.append(_line("right", "x", xband["x_right"], "document")) + for pg, m in sorted(per_pg_m.items(), key=lambda kv: int(kv[0])): + if not margins_on(int(pg)): + continue + margins.append(_line("top", "y", m["top"], "page", int(pg))) + margins.append(_line("bottom", "y", m["bottom"], "page", int(pg))) + + # furniture + figures as BOXES, grouped by page + furn_pg, fig_pg = {}, {} + for it in furniture.get("items", []): + pg = it["page"] + if it.get("furniture"): + furn_pg.setdefault(pg, []).append( + {"box": it["bbox"], "kind": _furn_kind(it), "docling_label": it["label"], + "source": "auto", "confirmed": False}) + elif it["kind"] == "picture": + fig_pg.setdefault(pg, []).append( + {"box": it["bbox"], "source": "auto", "confirmed": False}) + + tbl_pg = {} + for t in structured.get("tables", []): + if t.get("page"): + tbl_pg.setdefault(t["page"], []).append( + {"box": t.get("bbox"), "n_rows": t.get("n_rows"), "n_cols": t.get("n_cols"), + "table_source": t.get("source"), "source": "auto", "confirmed": False}) + + # --- reconcile against recovered part labels ------------------------------------------- + # A part-label position is never furniture or a figure (the label wins), and a "figure" that + # covers most of the content area is a Docling page-collapse artifact (the GPU sometimes flags + # the whole page as one picture), not a real figure -> drop both. Fixes the Q1.7/Q1.9 clashes + # and the full-page "figure" that was masking part labels. + part_boxes_pg = {} + for q in structured.get("questions", []): + for p in q["parts"]: + if p.get("bbox") and p.get("page"): + part_boxes_pg.setdefault(p["page"], []).append(p["bbox"]) + + def _inter(a, b): + return not (a["r"] < b["l"] or b["r"] < a["l"] or a["t"] < b["b"] or b["t"] < a["b"]) + + def _area(b): + return max(0, b["r"] - b["l"]) * max(0, b["t"] - b["b"]) + + for pg, items in list(furn_pg.items()): + pls = part_boxes_pg.get(pg, []) + furn_pg[pg] = [f for f in items if not (f.get("box") and any(_inter(f["box"], pl) for pl in pls))] + for pg, items in list(fig_pg.items()): + pls = part_boxes_pg.get(pg, []) + m = per_pg_m.get(str(pg)) or per_pg_m.get(pg) or {} + carea = ((m.get("right", 0) - m.get("left", 0)) * (m.get("top", 0) - m.get("bottom", 0))) or (595 * 842) + fig_pg[pg] = [f for f in items if f.get("box") + and _area(f["box"]) <= 0.55 * carea # not a full-page collapse + and not any(_inter(f["box"], pl) for pl in pls)] # not clashing a part label + + pages = {} + all_pg = (set(bands["pages"]) | {str(p) for p in furn_pg} | {str(p) for p in fig_pg} + | {str(p) for p in page_roles}) + for pgs in sorted(all_pg, key=int): + pg = int(pgs) + pb = bands["pages"].get(pgs) or bands["pages"].get(pg) or {"main": [], "part": []} + main = [{"question": m["question"], "y_start": m["y_start"], "y_end": m["y_end"], + "is_start": m.get("is_start", True), + "source": "auto", "confirmed": False} for m in pb["main"]] + part = [{"label": p["label"], "question": p["question"], + "y_start": p["y_start"], "y_end": p["y_end"], + "label_box": part_bbox.get(p["label"]), # app may render a box instead of lines + "source": "auto", "confirmed": False} for p in pb["part"]] + pr = page_roles.get(pgs) or page_roles.get(pg) or {} + pages[pgs] = { + "role": pr.get("role", "question"), + "role_source": pr.get("source", "default"), "role_confirmed": pr.get("confirmed", False), + "margins_enabled": pr.get("margins_enabled", True), # human-overridable + "main_bands": main, "part_bands": part, + "furniture": furn_pg.get(pg, []), "figures": fig_pg.get(pg, []), + "tables": tbl_pg.get(pg, []), + } + + return { + "meta": { + "schema": "exam-template/first-pass/v1", + "board": structured.get("board"), "paper_code": structured.get("paper_code"), + "source_pdf": pdf, "n_pages": furniture.get("n_pages"), + "coord_origin": "BOTTOMLEFT", "units": "pdf_points", + "generated_at": datetime.datetime.now().isoformat(timespec="seconds"), + "ui_principle": "directional limits = draggable axis-locked lines; " + "object footprints = boxes", + "confirmed": False, "confirmed_by": None, "confirmed_at": None, + }, + "margins": margins, + "pages": pages, + } + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--structured", required=True) + ap.add_argument("--bands", required=True) + ap.add_argument("--furniture", required=True) + ap.add_argument("--page-roles", dest="page_roles", help="page_roles.py JSON (roles + margin override)") + ap.add_argument("--pdf") + ap.add_argument("--out", default="results/template.json") + a = ap.parse_args() + roles = json.load(open(a.page_roles))["pages"] if a.page_roles else {} + t = build(json.load(open(a.structured)), json.load(open(a.bands)), + json.load(open(a.furniture)), a.pdf, roles) + json.dump(t, open(a.out, "w"), indent=2) + np = len(t["pages"]) + nm = sum(len(p["main_bands"]) for p in t["pages"].values()) + npt = sum(len(p["part_bands"]) for p in t["pages"].values()) + nf = sum(len(p["furniture"]) for p in t["pages"].values()) + ng = sum(len(p["figures"]) for p in t["pages"].values()) + print(f"template {t['meta']['paper_code']} ({t['meta']['board']}): {np} pages, " + f"{len(t['margins'])} margin-lines, {nm} main-bands, {npt} part-bands, " + f"{nf} furniture-boxes, {ng} figure-boxes") + print(f"-> wrote {a.out}") + + +if __name__ == "__main__": + main() diff --git a/api/services/docling/validate.py b/api/services/docling/validate.py new file mode 100644 index 0000000..5c19b30 --- /dev/null +++ b/api/services/docling/validate.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +""" +validate.py — G6 validation/judge: a deterministic consistency pass over an extractor result. + +NOT a gate. It never approves or rejects; it attaches confidence + flags so a HUMAN reviewer's +attention is routed to the parts most likely wrong. A clean paper -> all-green, skim; a flagged +paper -> the exact items to check, worst-first. Every value stays a *suggestion* a human confirms. + +Checks (all deterministic, no GPU, ~free — run on every extraction): + C1 marks-sum vs official max — over-read (sum>max) = error; under (sum --out report.json +""" +import json, re, sys, argparse +from collections import defaultdict + +IMPLAUSIBLE_PART_MARKS = 15 # a single sub-part above this is worth a human glance + + +def _qnum(q): + """Numeric value of a top-level question id ('01'->1, '4'->4); None if inferred ('~3') / odd.""" + if q.startswith("~"): + return None + m = re.match(r"^0*(\d+)$", q) + return int(m.group(1)) if m else None + + +def _subkey(label, q): + """The part's own suffix within its question: '01.2'->'2', '4a'->'a', '1bi'->'bi'.""" + s = label[len(q):] if label.startswith(q) else label + return s.lstrip(".").lstrip("~") + + +def validate(result): + board = result.get("board") + code = result.get("paper_code") + flags, checks = [], [] + parts = [(p["label"], q["question"], p) for q in result.get("questions", []) for p in q["parts"]] + conf = {} # label -> high/medium/low + low = set() # labels a check has implicated + + def add(cid, severity, status, detail): + checks.append({"id": cid, "severity": severity, "status": status, "detail": detail}) + if status != "ok": + flags.append(f"[{severity}] {cid}: {detail}") + + # ---- C1: marks sum vs official maximum ------------------------------------------------- + mc = result.get("stats", {}).get("marks_check") + exp = (mc or {}).get("expected_max") or result.get("front_matter", {}).get("max_marks") + msum = (mc or {}).get("sum") + if msum is None: + msum = sum(p["marks"] for *_, p in parts if p.get("marks") is not None) + if exp: + if msum > exp: + add("C1_marks_sum", "error", "over", + f"marks sum {msum} EXCEEDS official max {exp} (+{msum-exp}) — an over-read; check the paper") + elif msum < exp: + add("C1_marks_sum", "warn", "under", + f"marks sum {msum} below official max {exp} (-{exp-msum}) — missing parts or unread marks") + else: + add("C1_marks_sum", "info", "ok", f"marks sum {msum} == official max {exp}") + else: + add("C1_marks_sum", "info", "unknown", "no official max available to check the sum against") + + # ---- C2: per-part marks plausibility --------------------------------------------------- + none_ct = zero_ct = 0 + for lab, q, p in parts: + mk = p.get("marks") + if mk is None: + none_ct += 1; low.add(lab) + elif mk == 0: + zero_ct += 1; low.add(lab) + elif mk > IMPLAUSIBLE_PART_MARKS: + low.add(lab) + add("C2_part_marks", "warn", "implausible", + f"part {lab} has {mk} marks (> {IMPLAUSIBLE_PART_MARKS}) — verify it isn't a mis-read") + if none_ct or zero_ct: + add("C2_part_marks", "warn", "missing", + f"{none_ct} part(s) with no mark, {zero_ct} with 0 marks — unread/garbled mark tokens") + elif not any(c["id"] == "C2_part_marks" for c in checks): + add("C2_part_marks", "info", "ok", "every part carries a plausible mark") + + # ---- C3: top-level question sequence + EXPECTED-question interpolation ------------------ + # If Q1, Q2 ... Q14 are recovered but 3-13 are not, the paper certainly HAS 3-13 — they were + # just missed (e.g. a Docling page-collapse). We emit the full expected sequence with a per-Q + # `recovered` flag so a live question-tree view can render the gaps as explicit "needs a second + # pass" slots, and a targeted re-OCR knows exactly which questions to chase. + qids = [q for q in dict.fromkeys(q for _, q, _ in parts)] + nums = sorted({n for n in (_qnum(q) for q in qids) if n is not None}) + zero_pad = any(len(q) == 2 and q.startswith("0") for q in qids) # AQA 'NN' vs Edexcel/OCR 'N' + question_sequence = [] + if any(q.startswith("~") for q in qids): + add("C3_question_seq", "info", "inferred", + "question numbers were OCR-inferred ('~N') — sequence not checkable; treat labels as approximate") + elif nums: + # isolated high outliers (a content number mis-read as 'Q67' after Q1-10) are likely + # spurious top-levels, not 50 missing questions — strip them off the top so the sequence + # reflects the real paper, and flag them for review instead of flooding the tree with slots. + core, suspect = nums[:], [] + while len(core) >= 2 and core[-1] - core[-2] > 4: + suspect.insert(0, core.pop()) + hi = core[-1] if core else nums[-1] + gaps = [n for n in range(nums[0], hi + 1) if n not in core] + question_sequence = [{"n": n, "label": (f"{n:02d}" if zero_pad else str(n)), + "recovered": n in core} for n in range(nums[0], hi + 1)] + if suspect: + add("C3_question_seq", "warn", "spurious", + f"isolated high question number(s) {suspect} after a {nums[0]}-{hi} run — likely a " + f"content number mis-read as a top-level question; review/remove") + if gaps: + add("C3_question_seq", "warn", "gap", + f"top-level questions {gaps} missing between {nums[0]}-{hi} — expected but " + f"unrecovered; surface as second-pass slots in the question tree") + elif not suspect: + add("C3_question_seq", "info", "ok", f"questions {nums[0]}-{hi} contiguous") + + # ---- C4: sub-part contiguity within each question -------------------------------------- + def order(keys): + """Map a question's child keys to an ordered scheme + report holes. Handles .N and a/b/c.""" + dig = sorted(int(k[0]) for k in keys if k[:1].isdigit()) + let = sorted(k[0] for k in keys if k[:1].isalpha()) + holes = [] + if dig: + holes += [str(n) for n in range(dig[0], dig[-1] + 1) if n not in dig] + if let: + lo, hi = ord(let[0]), ord(let[-1]) + holes += [chr(c) for c in range(lo, hi + 1) if chr(c) not in let] + return holes + byq = defaultdict(list) + for lab, q, p in parts: + sk = _subkey(lab, q) + if sk: + byq[q].append(sk) + seq_holes = {} + for q, keys in byq.items(): + firsts = {k[0] for k in keys} # immediate children only (a / 1 / etc.) + h = order(firsts) + if h: + seq_holes[q] = h + if seq_holes: + add("C4_subpart_seq", "warn", "gap", + "sub-part gaps: " + ", ".join(f"Q{q} missing {hs}" for q, hs in sorted(seq_holes.items()))) + else: + add("C4_subpart_seq", "info", "ok", "sub-parts contiguous within every question") + + # ---- C5: coverage vs ground truth (when present) --------------------------------------- + cov = result.get("coverage", {}) + if cov.get("coverage_pct") is not None: + missed = cov.get("missed", []) + if missed: + add("C5_coverage", "warn", "missed", + f"{cov['coverage_pct']}% vs GT ({cov['recovered']}/{cov['total']}); missed {missed[:10]}") + low.update(missed) + else: + add("C5_coverage", "info", "ok", f"100% coverage vs GT ({cov['recovered']}/{cov['total']})") + + # ---- per-part confidence + paper summary ----------------------------------------------- + sum_mismatch = any(c["id"] == "C1_marks_sum" and c["status"] in ("over", "under") for c in checks) + for lab, q, p in parts: + if lab in low: + conf[lab] = "low" + elif sum_mismatch: + conf[lab] = "medium" # paper-level doubt taints every part a little + else: + conf[lab] = "high" + severities = [c["severity"] for c in checks if c["status"] not in ("ok", "info", "unknown")] + worst = "error" if "error" in severities else "warn" if "warn" in severities else "clean" + + return { + "paper_code": code, "board": board, + "summary": { + "worst_severity": worst, + "needs_priority_review": worst != "clean", + "n_flags": len(flags), + "marks_sum": msum, "official_max": exp, + "parts_total": len(parts), + "parts_low_conf": sum(1 for v in conf.values() if v == "low"), + "questions_expected": len(question_sequence) or None, + "questions_recovered": sum(1 for q in question_sequence if q["recovered"]) or None, + }, + "flags": flags, + "checks": checks, + "part_confidence": conf, + "question_sequence": question_sequence, # full expected skeleton (recovered + missing slots) + } + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("structured") + ap.add_argument("--out") + a = ap.parse_args() + rep = validate(json.load(open(a.structured))) + s = rep["summary"] + print(f"paper : {rep['paper_code']} ({rep['board']})") + print(f"verdict : {s['worst_severity'].upper()} " + f"{'-> PRIORITY REVIEW' if s['needs_priority_review'] else '-> all checks clean (still human-reviewable)'}") + print(f"marks : {s['marks_sum']}/{s['official_max']} | parts {s['parts_total']} " + f"({s['parts_low_conf']} low-confidence)") + if s.get("questions_expected"): + miss = [q["label"] for q in rep["question_sequence"] if not q["recovered"]] + print(f"questions : {s['questions_recovered']}/{s['questions_expected']} recovered" + + (f" | second-pass slots: {miss}" if miss else " (complete sequence)")) + if rep["flags"]: + print("flags:") + for f in rep["flags"]: + print(f" - {f}") + else: + print("flags : none") + if a.out: + json.dump(rep, open(a.out, "w"), indent=2) + print(f"-> wrote {a.out}") + + +if __name__ == "__main__": + main() diff --git a/tests/test_docling_auto_map.py b/tests/test_docling_auto_map.py new file mode 100644 index 0000000..a36e641 --- /dev/null +++ b/tests/test_docling_auto_map.py @@ -0,0 +1,51 @@ +import json +import os +from pathlib import Path + +import pytest + +from api.services.docling import FIRST_PASS_SCHEMA, auto_map + + +SPIKE_ROOT = Path(os.environ.get("DOCLING_SPIKE_ROOT", "/home/kcar/dev/docling-exam-spike")) +PHYSICS_PDF = SPIKE_ROOT / "samples" / "AQA-Physics-Paper-1H-2022-with-qr.pdf" +PHYSICS_TEMPLATE = SPIKE_ROOT / "results" / "template" / "physics.json" +BORN_DIGITAL_PDF = SPIKE_ROOT / "samples" / "physics-p1h-2022-qp.pdf" + + +@pytest.mark.skipif(not (PHYSICS_PDF.exists() and PHYSICS_TEMPLATE.exists()), reason="spike corpus not present") +def test_auto_map_matches_spike_physics_template_shape(): + expected = json.loads(PHYSICS_TEMPLATE.read_text()) + result = auto_map(PHYSICS_PDF.read_bytes(), spike_root=SPIKE_ROOT) + + assert result["meta"]["schema"] == FIRST_PASS_SCHEMA + assert result["meta"]["schema"] == expected["meta"]["schema"] + assert set(result.keys()) == set(expected.keys()) + assert result["meta"]["board"] == expected["meta"]["board"] + assert result["meta"]["paper_code"] == expected["meta"]["paper_code"] + assert len(result["margins"]) == len(expected["margins"]) + assert set(result["pages"].keys()) == set(expected["pages"].keys()) + assert result["pages"]["2"]["role"] == expected["pages"]["2"]["role"] + assert result["pages"]["2"]["part_bands"][0].keys() == expected["pages"]["2"]["part_bands"][0].keys() + + +@pytest.mark.skipif(not BORN_DIGITAL_PDF.exists(), reason="born-digital spike PDF not present") +def test_auto_map_fast_path_without_cache_produces_first_pass_template(): + result = auto_map( + BORN_DIGITAL_PDF.read_bytes(), + source_pdf="samples/physics-p1h-2022-qp.pdf", + spike_root=SPIKE_ROOT, + prefer_cache=False, + ) + + assert result["meta"]["schema"] == FIRST_PASS_SCHEMA + assert result["meta"]["board"] == "aqa" + assert result["meta"]["paper_code"] == "8463/1" + assert result["meta"]["source_pdf"] == "samples/physics-p1h-2022-qp.pdf" + assert result["margins"] + assert result["pages"] + + +def test_auto_map_rejects_empty_pdf_bytes(): + with pytest.raises(ValueError): + auto_map(b"")