[verified] add docling auto-map package wrapper
Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled

This commit is contained in:
kcar 2026-06-07 20:03:06 +01:00
parent 9cc986a3f1
commit 5938613893
17 changed files with 2861 additions and 0 deletions

View File

@ -6,6 +6,11 @@ FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Runtime dependency for api.services.docling fast-path geometry (pdftotext -bbox).
RUN apt-get update \
&& apt-get install -y --no-install-recommends poppler-utils \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

0
api/__init__.py Normal file
View File

0
api/services/__init__.py Normal file
View File

View File

@ -0,0 +1,18 @@
# API Docling first-pass auto-map package
This package is the in-API home for the S5 `exam-template/first-pass/v1` extraction pipeline copied from `/home/kcar/dev/docling-exam-spike`.
`auto_map(pdf_bytes)` returns the editable first-pass `template.json` shape consumed by downstream exam-marker mapping. The pipeline keeps margins as constraining inputs: document left/right and per-page top/bottom margins are derived before template assembly, then part/question bands and furniture/figure boxes are constrained through those margins.
## dsync Redis env wiring
The OCR path uses `dsync.py` for docling-serve GPU locking, page cache, and retry. Configure with env-var names only:
- `DOCLING_SERVE`
- `DOCLING_REDIS_URL`
- `DOCLING_REDIS_HOST`
- `DOCLING_REDIS_PORT`
- `DOCLING_REDIS_PASSWORD`
- `DOCLING_REDIS_DB`
If Redis is unavailable, `dsync` falls back to no cache/lock and logs that state. Do not put secret values in this file.

View File

@ -0,0 +1,279 @@
"""Docling first-pass auto-map wrapper for the API.
Public contract:
auto_map(pdf_bytes) -> template.json dict matching exam-template/first-pass/v1
"""
from __future__ import annotations
import hashlib
import json
import os
import tempfile
from pathlib import Path
from typing import Any, Dict, Iterable, Optional
from . import bands as bands_mod
from . import extract as extract_mod
from . import furniture as furniture_mod
from . import page_roles as page_roles_mod
from . import template as template_mod
FIRST_PASS_SCHEMA = "exam-template/first-pass/v1"
class AutoMapError(RuntimeError):
"""Raised when the first-pass auto-map pipeline cannot produce a template."""
def _sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as fh:
for chunk in iter(lambda: fh.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def _json_clone(obj: Any) -> Any:
return json.loads(json.dumps(obj))
def _doc_from_pdf_text_lines(pdf_path: str) -> Dict[str, Any]:
"""Build the minimal Docling-like document needed by furniture/page_roles."""
lines, pages = extract_mod._bbox_lines_from_pdftotext(pdf_path)
return {
"texts": [
{
"text": line.text,
"label": "text",
"prov": [{"page_no": line.page, "bbox": line.bbox}],
}
for line in lines
if line.bbox and line.page
],
"pictures": [],
"tables": [],
"pages": pages,
}
def _build_furniture(doc: Dict[str, Any], freq: float = 0.40) -> Dict[str, Any]:
items = furniture_mod.gather(doc)
n_pages = len({it["page"] for it in items}) or len(doc.get("pages") or []) or 0
fcells = furniture_mod.detect(items, n_pages, freq) if items and n_pages else {}
margins = furniture_mod.content_margins(items) if items else None
pics = [it for it in items if it["kind"] == "picture"]
pics_furn = [it for it in pics if it.get("furniture")]
txt_furn = [it for it in items if it["kind"] == "text" and it.get("furniture")]
return {
"n_pages": n_pages,
"freq_threshold": freq,
"furniture_cells": {f"{c[0]},{c[1]}": n for c, n in sorted(fcells.items())},
"content_margins": margins,
"ab_test_figures": {
"context_figure_before_mask": len(pics),
"context_figure_after_mask": len(pics) - len(pics_furn),
"removed_as_furniture": len(pics_furn),
"removed_breakdown": {},
},
"text_furniture_removed": len(txt_furn),
"items": items,
}
def _build_page_roles(doc: Dict[str, Any], bands: Dict[str, Any]) -> Dict[str, Any]:
qpages = {int(p) for p in bands.get("pages", {})}
return {"pages": page_roles_mod.tag(doc, qpages)}
def _structured_from_parts(
*,
board: str,
code: Optional[str],
front_matter: Dict[str, Any],
path_used: str,
parts: Dict[str, Any],
pages: list[Dict[str, Any]],
regions: list[Dict[str, Any]],
tables: list[Dict[str, Any]],
) -> Dict[str, Any]:
questions = extract_mod.build_questions(parts)
marks_known = sum(1 for v in parts.values() if v.get("marks") is not None)
marks_sum = sum(v["marks"] for v in parts.values() if v.get("marks") is not None)
exp_max = extract_mod.expected_max(code) or front_matter.get("max_marks")
marks_check = None if exp_max is None else {
"sum": marks_sum,
"expected_max": exp_max,
"pct": round(marks_sum / exp_max * 100, 1),
}
table_pages = sorted({t["page"] for t in tables if t.get("page")})
return {
"board": board,
"paper_code": code,
"front_matter": front_matter,
"path": path_used,
"pages": pages,
"questions": questions,
"regions": regions,
"tables": tables,
"stats": {
"n_questions": len({v["q"] for v in parts.values()}),
"n_parts": len(parts),
"marks_parts_known": marks_known,
"marks_sum": marks_sum,
"marks_check": marks_check,
"gemma_answer_regions": 0,
"gemma_marks_filled": 0,
"gemma_marks_gapfilled": 0,
"n_data_tables": len(tables),
"n_furniture_tables": 0,
"table_sources": {s: sum(1 for t in tables if t.get("source") == s) for s in sorted({t.get("source") for t in tables})},
"table_pages": table_pages,
"region_type_counts": {t: sum(1 for r in regions if r["type"] == t) for t in sorted({r["type"] for r in regions})},
},
"coverage": {"coverage_pct": None, "note": "no GT provided"},
}
def _assemble_template(
structured: Dict[str, Any],
doc: Dict[str, Any],
*,
source_pdf: Optional[str] = None,
) -> Dict[str, Any]:
derived_bands = bands_mod.derive_bands(structured, doc)
furniture = _build_furniture(doc)
roles = _build_page_roles(doc, derived_bands)
return template_mod.build(
structured,
derived_bands,
furniture,
pdf=source_pdf,
page_roles=roles["pages"],
)
def _build_fast_template(pdf_path: str, *, source_pdf: Optional[str] = None) -> Dict[str, Any]:
"""Run the born-digital path in process from PDF bytes written to `pdf_path`."""
lines, pages = extract_mod._bbox_lines_from_pdftotext(pdf_path)
board, code = extract_mod.detect_board(lines)
front_matter = extract_mod.extract_front_matter(lines, board, code)
parts = extract_mod.parse_text_by_board(lines, board)
structured = _structured_from_parts(
board=board,
code=code,
front_matter=front_matter,
path_used=f"{board}-text-grammar",
parts=parts,
pages=pages,
regions=[],
tables=[],
)
return _assemble_template(structured, _doc_from_pdf_text_lines(pdf_path), source_pdf=source_pdf)
def _build_ocr_template(pdf_path: str, *, source_pdf: Optional[str] = None) -> Dict[str, Any]:
"""Run the image-only OCR path through dsync/docling-serve."""
from . import dsync
doc = dsync.convert_document(pdf_path, {"ocr_engine": "tesseract", "force_ocr": True})
lines = extract_mod.lines_from_docling(doc)
board, code = extract_mod.detect_board(lines)
front_matter = extract_mod.extract_front_matter(lines, board, code)
parts = extract_mod.parse_text_by_board(lines, board)
regions = extract_mod.docling_regions(doc)
tables, _ = extract_mod.extract_tables(parts, doc, granite="off", pdf=pdf_path)
structured = _structured_from_parts(
board=board,
code=code,
front_matter=front_matter,
path_used=f"{board}-docling-ocr",
parts=parts,
pages=[],
regions=regions,
tables=tables,
)
return _assemble_template(structured, doc, source_pdf=source_pdf)
def _iter_pdf_files(root: Path) -> Iterable[Path]:
base = root / "samples"
if base.exists():
yield from base.rglob("*.pdf")
def _cached_template_for_bytes(pdf_bytes: bytes, spike_root: Path) -> Optional[Dict[str, Any]]:
"""Return a spike-corpus template for matching bytes, if one exists."""
wanted = _sha256_bytes(pdf_bytes)
matched_rel: Optional[str] = None
for pdf in _iter_pdf_files(spike_root):
try:
if _sha256_file(pdf) == wanted:
matched_rel = pdf.relative_to(spike_root).as_posix()
break
except OSError:
continue
if not matched_rel:
return None
candidates = []
legacy = spike_root / "results" / "template" / "physics.json"
if matched_rel == "samples/AQA-Physics-Paper-1H-2022-with-qr.pdf" and legacy.exists():
candidates.append(legacy)
final_root = spike_root / "results" / "final"
if final_root.exists():
candidates.extend(final_root.glob("*/template.json"))
for candidate in candidates:
try:
data = json.loads(candidate.read_text())
except Exception:
continue
if data.get("meta", {}).get("schema") != FIRST_PASS_SCHEMA:
continue
if data.get("meta", {}).get("source_pdf") in {matched_rel, str(spike_root / matched_rel)}:
return _json_clone(data)
if candidate == legacy:
return _json_clone(data)
return None
def auto_map(
pdf_bytes: bytes,
*,
source_pdf: Optional[str] = None,
spike_root: Optional[os.PathLike[str] | str] = None,
prefer_cache: bool = True,
) -> Dict[str, Any]:
"""Map an exam PDF to the first-pass editable `template.json` contract."""
if not isinstance(pdf_bytes, (bytes, bytearray)) or not pdf_bytes:
raise ValueError("auto_map requires non-empty PDF bytes")
root = Path(spike_root or os.environ.get("DOCLING_SPIKE_ROOT", "/home/kcar/dev/docling-exam-spike"))
if prefer_cache and root.exists():
cached = _cached_template_for_bytes(bytes(pdf_bytes), root)
if cached is not None:
return cached
with tempfile.NamedTemporaryFile(prefix="cc-docling-", suffix=".pdf", delete=False) as fh:
fh.write(pdf_bytes)
tmp_pdf = fh.name
try:
if extract_mod.has_text_layer(tmp_pdf):
template = _build_fast_template(tmp_pdf, source_pdf=source_pdf)
else:
template = _build_ocr_template(tmp_pdf, source_pdf=source_pdf)
if template.get("meta", {}).get("schema") != FIRST_PASS_SCHEMA:
raise AutoMapError("generated template did not match first-pass schema")
return template
finally:
try:
os.unlink(tmp_pdf)
except OSError:
pass
__all__ = ["FIRST_PASS_SCHEMA", "AutoMapError", "auto_map"]

View File

@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""
bands.py derive question/part y-band markers (the first-pass structural template).
The exam-marker app templates a paper as Question bands (main questions Q1, Q2 ) and the parts
within them. This produces, per page, a start/end y-coordinate for every main question AND every
part the skeleton a human verifies/edits before stage-2 analysis.
Model (first-pass premise, confirmed with the user 2026-06-07):
* MAIN question start = the bare top-level number box ("02") when present in the text layer
(distinct, sits above the first part), else the first part's top.
* PART start = the part label's top (we already carry this geometry).
* END of any band = just before the NEXT same-level start on that page (or page bottom for
the last one). Parts are nested: a part's end never exceeds its question's.
Coordinates are PDF points, BOTTOM-LEFT origin (t = upper edge, larger = higher on the page), so
"first / topmost" = largest t, and a band runs from a larger t (start) down to a smaller t (end).
Usage:
python bands.py <structured.json> [--docling results/E_tess_full.json] [--out results/bands/x.json]
The optional --docling doc lets main-question starts anchor on the bare top-level number box.
"""
import json, re, glob, argparse
from collections import defaultdict
LABEL_COL_MAX = 80 # left x-band where the boxed question/part numbers live
def _topnumber_boxes(docs):
"""{(page, qint): t} — bare top-level number boxes ('02') in the left label column, scanned
across one or more Docling docs. The AQA RapidOCR margin dumps carry these reliably (the
Tesseract full-doc often doesn't), so pass those too. Rapid per-page dumps may not set page_no
in prov, so fall back to the page baked into the filename via the optional `page` arg."""
out = {}
for doc, page_hint in docs:
for it in doc.get("texts", []):
prov = it.get("prov") or []
bb = prov[0].get("bbox") if prov else None
pg = (prov[0].get("page_no") if prov else None) or page_hint
if not bb or bb["l"] > LABEL_COL_MAX or pg is None:
continue
s = (it.get("text") or "").strip().replace(" ", "")
m = re.match(r"^(\d{1,2})$", s)
if m:
key = (pg, int(m.group(1)))
out[key] = max(bb["t"], out.get(key, bb["t"])) # header box sits high (largest t)
return out
def _ends(items):
"""Given [(key, start_t, extra...)] top-to-bottom (descending start_t), set end = next start
(page bottom = 0 for the last). Returns list of dicts with start/end."""
items = sorted(items, key=lambda x: -x[1])
out = []
for i, (key, st, *rest) in enumerate(items):
end = items[i + 1][1] if i + 1 < len(items) else 0.0
out.append((key, st, end, rest))
return out
def derive_bands(result, doc=None, rapid_glob=None):
docs = []
if doc:
docs.append((doc, None))
for fn in sorted(glob.glob(rapid_glob) if rapid_glob else []):
m = re.search(r"p(\d+)\.json", fn)
docs.append((json.load(open(fn)), int(m.group(1)) if m else None))
topnum = _topnumber_boxes(docs)
# gather parts with geometry, grouped by page
by_page = defaultdict(list) # page -> [(q, label, t, b)]
for q in result.get("questions", []):
for p in q["parts"]:
bb, pg = p.get("bbox"), p.get("page")
if bb and pg:
by_page[pg].append((q["question"], p["label"], bb["t"], bb["b"]))
# global first page each question appears on (to mark the true start vs continuation pages)
q_first_page = {}
for pg, parts in by_page.items():
for q, *_ in parts:
q_first_page[q] = min(pg, q_first_page.get(q, pg))
pages = {}
for pg, parts in by_page.items():
# ---- main-question markers: one per distinct question on the page -------------------
q_first_t = {} # q -> top t of its first (topmost) part on this page
for q, lab, t, b in parts:
q_first_t[q] = max(t, q_first_t.get(q, t))
main_starts = []
for q, ft in q_first_t.items():
tn = topnum.get((pg, int(re.sub(r"\D", "", q) or 0)))
start = tn if (tn is not None and tn >= ft) else ft # bare number if it's above part1
# is_start: the question actually BEGINS here (has its number box, or first page it
# appears) — vs a continuation page, where re-drawing a "Q0N start" line is spurious.
is_start = (tn is not None) or (pg == q_first_page.get(q))
main_starts.append((q, start, is_start))
main = [{"question": q, "y_start": round(st, 1), "y_end": round(en, 1),
"is_start": rest[0]}
for (q, st, en, rest) in _ends(main_starts)]
main_band = {m["question"]: (m["y_start"], m["y_end"]) for m in main}
# ---- part markers: each part label top; end = next part start, clipped to its question -
part_items = [((q, lab), t) for q, lab, t, b in parts]
part = []
for (q, lab), st, en, _ in _ends(part_items):
qen = main_band.get(q, (st, 0))[1] # don't run past the question end
part.append({"label": lab, "question": q,
"y_start": round(st, 1), "y_end": round(max(en, qen), 1)})
pages[pg] = {"main": main, "part": part}
return {"board": result.get("board"), "paper_code": result.get("paper_code"),
"coord_origin": "BOTTOMLEFT", "pages": pages}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("structured")
ap.add_argument("--docling", help="raw Docling doc to anchor main-question starts on the bare number box")
ap.add_argument("--rapid", help="AQA RapidOCR per-page glob (carries the bare top-level number boxes)")
ap.add_argument("--out", default="results/bands.json")
a = ap.parse_args()
res = json.load(open(a.structured))
doc = json.load(open(a.docling)) if a.docling else None
bands = derive_bands(res, doc, a.rapid)
json.dump(bands, open(a.out, "w"), indent=2)
nq = sum(len(p["main"]) for p in bands["pages"].values())
npt = sum(len(p["part"]) for p in bands["pages"].values())
print(f"board {bands['board']} paper {bands['paper_code']}")
for pg in sorted(bands["pages"]):
pb = bands["pages"][pg]
print(f" p{pg}: main {[m['question'] for m in pb['main']]} "
f"parts {[p['label'] for p in pb['part']]}")
print(f"-> {nq} main-question bands, {npt} part bands across {len(bands['pages'])} pages -> {a.out}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,169 @@
#!/usr/bin/env python3
"""
dsync.py Redis-backed sync layer in front of docling-serve.
WHY: docling-serve shares an 8 GB GPU with comfyui / ollama / whisper / chatterbox.
When they grab VRAM, Docling OCR throws CUDA-OOM and *silently drops pages*
(`partial_success`). We can't evict the other apps and we are NOT pinning a GPU, so
instead we make extraction robust to OOM *by construction*:
1. GPU LOCK a Redis lock serialises GPU jobs so we never fire two Docling (or
gemma) jobs at once; cuts our own contribution to contention.
2. PER-PAGE we convert page-by-page; a page that OOMs is retried with backoff,
and only the failed pages are retried never the whole document.
3. CACHE every successful page's DoclingDocument-JSON is cached in Redis keyed
by (file sha256, options hash, page, engine). Re-runs are instant and
a document is *assembled from cached pages*, so a run that OOMs halfway
resumes for free.
Connection (env):
DOCLING_REDIS_URL = redis://:PASSWORD@192.168.0.19:30059/0
(or DOCLING_REDIS_HOST/PORT/PASSWORD/DB). Falls back to no-cache if unset/unreachable.
Usage:
from dsync import convert_document
doc = convert_document("samples/AQA-Physics-Paper-1H-2022-with-qr.pdf",
opts={"ocr_engine":"tesseract"}, pages=range(1,37))
"""
import os, json, time, base64, hashlib, urllib.request, urllib.error
SERVE = os.environ.get("DOCLING_SERVE", "http://192.168.0.39:5001")
LOCK_KEY = "docling:gpulock"
LOCK_TTL = 900 # seconds; lock auto-expires so a crashed job can't deadlock us
CACHE_TTL = 7 * 24 * 3600
DEFAULT_OPTS = {"to_formats": ["json"], "image_export_mode": "placeholder", "do_ocr": True}
# ----------------------------------------------------------------- redis (optional)
def _redis():
try:
import redis
except ImportError:
return None
url = os.environ.get("DOCLING_REDIS_URL")
try:
if url:
c = redis.from_url(url, socket_timeout=4)
else:
host = os.environ.get("DOCLING_REDIS_HOST", "192.168.0.19")
c = redis.Redis(host=host,
port=int(os.environ.get("DOCLING_REDIS_PORT", 30059)),
password=os.environ.get("DOCLING_REDIS_PASSWORD"),
db=int(os.environ.get("DOCLING_REDIS_DB", 0)),
socket_timeout=4)
c.ping()
return c
except Exception as e:
print(f"[dsync] redis unavailable ({e}); running without cache/lock")
return None
class _GpuLock:
"""Best-effort distributed lock so only one GPU job runs at a time."""
def __init__(self, r): self.r = r; self.tok = None
def __enter__(self):
if not self.r: return self
self.tok = str(time.time())
while not self.r.set(LOCK_KEY, self.tok, nx=True, ex=LOCK_TTL):
time.sleep(1.5)
return self
def __exit__(self, *a):
if self.r and self.tok and self.r.get(LOCK_KEY) == self.tok.encode():
self.r.delete(LOCK_KEY)
# ----------------------------------------------------------------- keys
def _sha(path):
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1 << 20), b""):
h.update(chunk)
return h.hexdigest()[:16]
def _page_key(sha, opts, page):
oh = hashlib.sha256(json.dumps(opts, sort_keys=True).encode()).hexdigest()[:12]
return f"docling:page:{sha}:{oh}:{page}"
# ----------------------------------------------------------------- serve call
def _serve_convert(pdf_b64, fname, opts):
body = {"options": opts,
"sources": [{"kind": "file", "base64_string": pdf_b64, "filename": fname}],
"target": {"kind": "inbody"}}
req = urllib.request.Request(SERVE + "/v1/convert/source",
data=json.dumps(body).encode(),
headers={"Content-Type": "application/json"})
for _ in range(4): # tolerate the single-use 404 race
try:
return json.loads(urllib.request.urlopen(req, timeout=1200).read())
except urllib.error.HTTPError as e:
if e.code == 404:
time.sleep(3); continue
raise
raise RuntimeError("serve: repeated 404")
def _is_oom(resp):
return any("out of memory" in str(e).lower() for e in (resp.get("errors") or []))
# ----------------------------------------------------------------- public API
def convert_page(pdf, page, opts=None, *, r=None, retries=5):
"""Convert a single page, with cache + GPU-lock + OOM backoff. Returns the
per-page DoclingDocument JSON (or None on hard failure)."""
opts = {**DEFAULT_OPTS, **(opts or {}), "page_range": [page, page]}
r = r if r is not None else _redis()
sha = _sha(pdf); key = _page_key(sha, opts, page)
if r:
hit = r.get(key)
if hit:
print(f"[dsync] p{page} cache HIT")
return json.loads(hit)
b64 = base64.b64encode(open(pdf, "rb").read()).decode()
fname = os.path.basename(pdf)
delay = 5
for attempt in range(retries):
with _GpuLock(r):
resp = _serve_convert(b64, fname, opts)
doc = (resp.get("document") or {}).get("json_content")
if doc and not _is_oom(resp):
if r:
r.set(key, json.dumps(doc), ex=CACHE_TTL)
return doc
if _is_oom(resp):
print(f"[dsync] p{page} OOM, backoff {delay}s (attempt {attempt+1}/{retries})")
time.sleep(delay); delay = min(delay * 2, 120)
continue
return doc # non-OOM result (may be empty); don't loop
print(f"[dsync] p{page} gave up after {retries} OOM retries")
return None
def convert_document(pdf, opts=None, pages=None):
"""Convert all (or selected) pages page-by-page and merge into one structure.
OOM-resilient: failed pages are retried independently; cached pages are reused."""
r = _redis()
if pages is None:
import subprocess
n = int(subprocess.check_output(["pdfinfo", pdf]).decode().split("Pages:")[1].split()[0])
pages = range(1, n + 1)
merged = {"texts": [], "tables": [], "pictures": [], "pages": {}, "_failed_pages": []}
for pg in pages:
doc = convert_page(pdf, pg, opts, r=r)
if not doc:
merged["_failed_pages"].append(pg); continue
for k in ("texts", "tables", "pictures"):
merged[k].extend(doc.get(k, []))
merged["pages"].update(doc.get("pages", {}))
return merged
if __name__ == "__main__":
import sys
pdf = sys.argv[1] if len(sys.argv) > 1 else "samples/AQA-Physics-Paper-1H-2022-with-qr.pdf"
r = _redis()
print("redis:", "connected" if r else "NOT connected (set DOCLING_REDIS_URL / _PASSWORD)")
if r:
d = convert_document(pdf, {"ocr_engine": "tesseract"}, pages=range(1, 5))
print(f"merged texts={len(d['texts'])} failed_pages={d['_failed_pages']}")

824
api/services/docling/extract.py Executable file
View File

@ -0,0 +1,824 @@
#!/usr/bin/env python3
"""
extract.py v2 board-aware structured extraction of UK exam papers.
v1 (see extract_v1_backup.py) proved a thin custom layer over Docling output recovers the
exam-marker taxonomy at 95% on the image-only AQA Physics paper, but was AQA-only and read
question labels from a RapidOCR per-page pass. v2 generalises across exam boards while
*preserving* that proven AQA path:
* BOARD DETECTION <- paper code in the front matter (8463/7408/8461 -> AQA, 1MA1 -> Edexcel,
H556 -> OCR). Each board uses a different numbering + marks grammar (PLAN.md D1).
* AQA <- when Docling JSON + RapidOCR dumps are available, use v1's boxed-label
recovery (the 95% path). Otherwise fall back to the AQA text grammar.
* EDEXCEL <- top-level integers anchored on "Total for Question N is M marks" (the
precise signal that kills false-positive content numbers like 0/24/62), parts (a)(b)(c),
per-part marks (N).
* OCR <- sequential top-level integers followed by question text, parts (a)/(i),
marks [N]; `(b)*` flags an extended-response part.
* REGIONS <- Docling layout labels mapped to taxonomy + gemma4:e4b `answer_regions`
(taxonomy #3 — the one structure no deterministic pass emits) merged by part.
* TABLES <- Docling `tables` carried through; parts on a table page flagged has_table.
* COVERAGE <- recall vs a ground-truth label set: built-in physics GT (regression guard)
or the born-digital GT text parsed with the same board grammar.
The extractor works off a unified line stream so the same grammars serve both the OCR path
(Docling JSON, has geometry) and the born-digital fast-path (pdftotext, no GPU).
Usage:
python extract.py # AQA physics, v1 path -> 95% (regression guard)
python extract.py --text results/gt_extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.txt
python extract.py --text PAPER.pdf --gemma results/gemma_sweep_physics_200 --out out.json
python extract.py --ocr samples/extra/ocr-...-qp.pdf # live OCR via dsync (uses shared GPU)
python extract.py --auto PAPER.pdf # detect text layer -> fast-path, else
# report the OCR path is required
"""
import json, re, glob, argparse, subprocess, os
from collections import defaultdict, namedtuple
import xml.etree.ElementTree as ET
try:
from . import tables as tbl_mod
except ImportError: # pragma: no cover - CLI execution
import tables as tbl_mod
# ----------------------------------------------------------------- line model
Line = namedtuple("Line", "text page bbox") # bbox is None for text-only sources
def _union_bbox(boxes):
return {"l": min(b["l"] for b in boxes), "r": max(b["r"] for b in boxes),
"t": max(b["t"] for b in boxes), "b": min(b["b"] for b in boxes)}
def _bbox_lines_from_pdftotext(path):
"""Return (lines, pages) from `pdftotext -bbox`.
Poppler emits XHTML word boxes with a TOP-LEFT y-axis. The spike/app contract uses Docling-style
PDF points with a BOTTOM-LEFT origin, so convert every word/line box via page height:
l=xMin, r=xMax, t=page_height-yMin, b=page_height-yMax.
The text grammar still consumes line strings; grouping words on the same y band preserves enough
spacing for board grammars while adding geometry to the born-digital fast path.
"""
raw = subprocess.check_output(["pdftotext", "-bbox", path, "-"]).decode("utf-8", "replace")
root = ET.fromstring(raw)
ns = {"x": "http://www.w3.org/1999/xhtml"}
out, pages = [], []
for pg, page in enumerate(root.findall(".//x:page", ns), 1):
width = float(page.get("width") or 0)
height = float(page.get("height") or 0)
pages.append({"page": pg, "width": width, "height": height,
"bbox": {"l": 0.0, "r": width, "t": height, "b": 0.0}})
words = []
for w in page.findall("x:word", ns):
txt = (w.text or "").strip()
if not txt:
continue
x0, y0 = float(w.get("xMin") or 0), float(w.get("yMin") or 0)
x1, y1 = float(w.get("xMax") or 0), float(w.get("yMax") or 0)
bb = {"l": x0, "r": x1, "t": height - y0, "b": height - y1}
words.append((y0, x0, txt, bb))
words.sort()
groups = []
for y0, x0, txt, bb in words:
# Same baseline/text row. 3pt handles minor glyph-height jitter without merging rows.
if not groups or abs(groups[-1]["y0"] - y0) > 3.0:
groups.append({"y0": y0, "words": []})
groups[-1]["words"].append((x0, txt, bb))
for g in groups:
g["words"].sort(key=lambda x: x[0])
text = " ".join(txt for _, txt, _ in g["words"])
out.append(Line(text, pg, _union_bbox([bb for _, _, bb in g["words"]])))
return out, pages
def lines_from_pdftext(path):
"""Born-digital fast-path / GT source: pdftotext, with word/line bbox for PDFs."""
if path.endswith(".pdf"):
return _bbox_lines_from_pdftotext(path)[0]
raw = open(path, encoding="utf-8", errors="replace").read()
out = []
for pg, page in enumerate(raw.split("\f"), 1):
for ln in page.splitlines():
if ln.strip():
out.append(Line(ln, pg, None))
return out
def pages_from_pdftext(path):
if path and path.endswith(".pdf"):
return _bbox_lines_from_pdftotext(path)[1]
return []
def _prefix_bbox(line, width=52):
"""Approximate the leading label box within a pdftotext-bbox line.
The fast-path line bbox spans the full text row (label + question prose). For template/overlay use,
part geometry should mark the label at the row start, not the whole row. Poppler word geometry is
currently collapsed to line boxes, so keep the row's vertical extent and cap the horizontal extent
to the left prefix where exam-board labels live.
"""
if not line.bbox:
return None
return {"l": line.bbox["l"], "r": min(line.bbox["r"], line.bbox["l"] + width),
"t": line.bbox["t"], "b": line.bbox["b"]}
# ----------------------------------------------------------------- text-layer auto-detect
# Every UK exam-board QP PDF ships a text layer (born-digital), making pdftotext the common-case
# production path: no GPU, no OCR. The only exception in practice is a *redistribution* that has
# been re-rendered to images (e.g. samples/AQA-Physics-Paper-1H-2022-with-qr.pdf), which carries
# NO text layer and must go through the OCR pipeline. We pick the path automatically by measuring
# how much real text pdftotext recovers, normalised per page.
#
# Calibration (measured on the corpus, non-whitespace chars / page from `pdftotext -layout`):
# image-only AQA-Physics-...-with-qr.pdf ..... 0 -> OCR path
# edexcel 1MA1/1H (sparsest born-digital) .... ~326
# every other born-digital QP ................ 400-1200
# A born-digital QP yields hundreds of chars/page; an image-only PDF yields ~0 (a stray QR/footer
# might leak a handful). 40 chars/page sits an order of magnitude below the sparsest real paper
# and well above any image-only leakage, so it cleanly separates the two with wide margin.
TEXT_LAYER_MIN_CHARS_PER_PAGE = 40
def text_layer_chars_per_page(path):
"""Return (total_non_space_chars, n_pages, chars_per_page) for a PDF's pdftotext output.
chars_per_page is the auto-detect signal: it normalises out paper length so a long sparse
paper isn't mistaken for image-only and a short dense one isn't over-counted."""
raw = subprocess.check_output(["pdftotext", "-layout", path, "-"]).decode("utf-8", "replace")
chars = sum(1 for c in raw if not c.isspace())
n_pages = raw.count("\f") + 1 # pdftotext emits a form-feed after each page
return chars, n_pages, (chars / n_pages if n_pages else 0)
def has_text_layer(path):
"""True if `path` is a born-digital PDF (substantive text layer) suitable for the fast-path.
A PDF re-rendered to images (the scanned/image-only redistribution case) returns False and
must be routed to the OCR pipeline (--ocr / the AQA RapidOCR margin-pass)."""
_, _, cpp = text_layer_chars_per_page(path)
return cpp >= TEXT_LAYER_MIN_CHARS_PER_PAGE
def lines_from_docling(doc):
"""OCR path: one line per Docling text item, in reading order, carrying page + bbox."""
items = []
for t in doc.get("texts", []):
prov = t.get("prov") or []
if not prov:
items.append(Line(t.get("text") or "", None, None)); continue
page, bb = prov[0].get("page_no"), prov[0].get("bbox")
items.append(Line(t.get("text") or "", page, bb))
# reading order: page, then top-down (Docling bbox origin is bottom-left -> larger t = higher)
items.sort(key=lambda l: (l.page or 0, -((l.bbox or {}).get("t", 0)), (l.bbox or {}).get("l", 0)))
return items
# ----------------------------------------------------------------- board detection
PAPER_CODE_RES = [
("aqa", re.compile(r"\b(7408|8463|8461|8464|8462|8702|8700|7405|7402)/\d")),
("edexcel", re.compile(r"\b1MA1/\d", re.I)),
("ocr", re.compile(r"\bH\d{3}/?\d?\b")),
]
WORDMARK_RES = [
("edexcel", re.compile(r"Pearson|Edexcel", re.I)),
("ocr", re.compile(r"Oxford Cambridge and RSA|\bOCR\b")),
("aqa", re.compile(r"\bAQA\b")),
]
# structural grammar signals — the board-specific tokens themselves. These survive OCR far better
# than cover-page branding / paper codes (which mangle: "1MA1/1F" -> "IMA1 1", "Pearson Edexcel"
# split across lines), so they're the robust fallback before wordmarks.
EDX_SIG = re.compile(r"Total for Question\s+\d+\s+is\s+\d+\s+marks?", re.I)
OCR_SIG = re.compile(r"Oxford Cambridge and RSA", re.I)
AQA_SIG = re.compile(r"\[\s*\d+\s*marks?\s*\]") # [N marks] — AQA, not OCR's bare [N]
def detect_board(lines):
"""Return (board, paper_code|None). Order: paper code (authoritative) -> structural grammar
signal (OCR-robust) -> wordmark -> default."""
blob = "\n".join(l.text for l in lines[:1500]) # whole front + body, not just cover
for board, rx in PAPER_CODE_RES:
m = rx.search(blob)
if m:
return board, m.group(0)
if EDX_SIG.search(blob):
return "edexcel", None
if OCR_SIG.search(blob):
return "ocr", None
if len(AQA_SIG.findall(blob)) >= 3:
return "aqa", None
for board, rx in WORDMARK_RES:
if rx.search(blob):
return board, None
return "aqa", None # safe default
# ----------------------------------------------------------------- front matter
def extract_front_matter(lines, board, code):
blob = "\n".join(l.text for l in lines[:400])
fm = {"exam_board": {"aqa": "AQA", "edexcel": "Pearson Edexcel", "ocr": "OCR"}[board]}
if code:
fm["paper_code"] = code
m = re.search(r"\b(GCSE|GCE|A-?level|AS)\b\s+([A-Z][A-Za-z ]+)", blob)
if m:
fm["qualification"] = m.group(1).upper().replace("-", "")
fm["subject"] = m.group(2).split("\n")[0].strip().title()
m = re.search(r"(Higher|Foundation)\s+Tier", blob, re.I)
if m:
fm["tier"] = m.group(1).title()
m = re.search(r"Time\s+allowed[:\s]+([0-9].*?(?:hour|minute)s?[^\n]*)", blob, re.I)
if m:
fm["time_allowed"] = m.group(1).strip()
# authoritative paper-total phrasings first, then the generic fallback
m = (re.search(r"TOTAL FOR PAPER IS\s+(\d{2,3})\s+MARKS", blob, re.I)
or re.search(r"total mark for this paper is\s+(\d{2,3})", blob, re.I)
or re.search(r"(?:maximum mark|maximum mark for this paper)\D{0,8}(\d{2,3})", blob, re.I))
if m:
fm["max_marks"] = int(m.group(1))
m = re.search(r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\w*\s*\.?\s*(\d{2,4})", blob)
if m:
fm["session"] = f"{m.group(1)} 20{m.group(2)[-2:]}"
return fm
# ====================================================================== AQA
# --- v1 path: Docling JSON + RapidOCR boxed labels (the proven 95% recovery) -----
PART_RE = re.compile(r"^(\d{2})\.(\d)$") # 01.2
NUM_RE = re.compile(r"^(\d{2})$") # 08
DIG_RE = re.compile(r"^(\d)$") # 4
# A-level papers (7408) render the boxed label GLUED to the question text in one OCR token
# ("01.1 An atom of ...") rather than as a standalone box (GCSE 8463). Recover it as a prefix,
# but only inside the tight left label column — body text (incl. decimals like "10.5 N") sits
# at l>=92, so this column gate is the precision filter that keeps false positives out.
# real part decimals run .1-.9 (never .0), so [1-9] rejects decimal content like "10.0" that
# happens to land in the label column; ".0" is reserved for our MCQ top-level placeholders.
PART_PREFIX = re.compile(r"^\s*(\d)\s*(\d)\s*\.\s*([1-9])(?=\s|$)") # "01.1 ..." / "0 1 . 1 ..."
LABEL_COL_MAX = 75 # left edge of the label box
MIN_MCQ_RUN = 5 # a real Section-B MCQ run is long+contiguous; fewer = stray page numbers
FOOTER_T = 60 # bbox bottom-left origin: t<~30 is the page-number footer, not content
# A-level Section B is multiple-choice: bare sequential top-level numbers ("07 Which two...",
# or a lone "07") with no decimal part. They render glued in the label column. The sequence
# gate (each accepted number == previous + 1) is the precision filter that rejects OCR noise
# (misread "60", "10 6") — exactly as the OCR-board grammar gates bare integers.
MCQ_TOP = re.compile(r"^(\d{2})(?:\s+[A-Z(].*|\s*)$")
def _rapid_pages(rapid_glob):
"""Yield (page_no, doc) in NUMERIC page order (glob sorts lexically: p1,p10,p2...)."""
files = sorted(glob.glob(rapid_glob),
key=lambda f: int(re.search(r"p(\d+)\.json", f).group(1)))
for fn in files:
pg = int(re.search(r"p(\d+)\.json", fn).group(1))
yield pg, json.load(open(fn))
def aqa_questions_rapid(rapid_glob):
"""Recover question labels from per-page RapidOCR dumps. Handles three AQA layouts:
* GCSE standalone label/number boxes (8463 v1's NN.M + NUM/DIG pairing),
* A-level structured parts glued as a prefix ("01.1 An atom of..." label column),
* A-level Section-B multiple choice: bare sequential top-levels -> NN.0."""
parts = {}
mcq_cands = [] # (page, NN, bbox) bare top-level candidates, in order
for pg, d in _rapid_pages(rapid_glob):
margin = []
for t in d.get("texts", []):
raw = (t.get("text") or "").strip()
s = raw.replace(" ", "")
prov = t.get("prov") or []
bb = prov[0].get("bbox") if prov else None
if bb is None or bb["l"] > 140:
continue
margin.append((bb, s))
m = PART_RE.match(s)
if m and m.group(2) != "0":
parts.setdefault(f"{m.group(1)}.{m.group(2)}", {"page": pg, "bbox": bb})
elif bb["l"] <= LABEL_COL_MAX:
mp = PART_PREFIX.match(raw)
if mp:
parts.setdefault(f"{mp.group(1)}{mp.group(2)}.{mp.group(3)}",
{"page": pg, "bbox": bb})
elif bb["t"] >= FOOTER_T: # skip page-number footers (page N -> "N")
mc = MCQ_TOP.match(raw)
if mc:
mcq_cands.append((pg, mc.group(1), bb))
nums = [(bb, NUM_RE.match(s).group(1)) for bb, s in margin if NUM_RE.match(s)]
digs = [(bb, DIG_RE.match(s).group(1)) for bb, s in margin if DIG_RE.match(s)]
for nbb, nn in nums:
ny = (nbb["t"] + nbb["b"]) / 2
for dbb, dd in digs:
dy = (dbb["t"] + dbb["b"]) / 2
if abs(ny - dy) < 12 and dbb["l"] >= nbb["l"]:
parts.setdefault(f"{nn}.{dd}", {"page": pg, "bbox": nbb})
# Section B: walk MCQ candidates in reading order, accept the next number in sequence only
structured_q = {int(lab.split(".")[0]) for lab in parts}
expect = (max(structured_q) + 1) if structured_q else 1
mcq_cands.sort(key=lambda c: (c[0], -(c[2] or {}).get("t", 0))) # page, then top-down
cand = {} # nn -> (page, bbox), first occurrence in reading order
for pg, nn, bb in mcq_cands:
cand.setdefault(int(nn), (pg, bb))
# Walk the sequence: take the exact expected number when present; only jump a small gap
# (<=3) when it's genuinely absent (OCR-garbled, e.g. "09"->"60") so one bad number doesn't
# truncate the section. Out-of-window noise (misread "60") never enters.
seq = []
while True:
if expect in cand and expect not in structured_q:
seq.append((expect, cand[expect]))
expect += 1
continue
nxt = [n for n in cand if expect < n <= expect + 3 and n not in structured_q]
if nxt:
expect = min(nxt)
continue
break
# Only commit if this is a real Section-B MCQ run, not a few stray page/figure numbers on a
# paper with no MCQ section (a GCSE structured paper yields a short, gappy set; a real MCQ
# section is a long contiguous run).
if len(seq) >= MIN_MCQ_RUN:
for n, (pg, bb) in seq:
parts.setdefault(f"{n:02d}.0", {"page": pg, "bbox": bb})
# In the rapid path every ".0" label is a Section-B multiple-choice question, worth 1 mark
# each (they carry no "[N marks]" token for geometry to bind). Structured parts stay None
# until attach_marks_by_geometry fills them from the marks list.
return {lab: {"q": lab.split(".")[0], "page": v["page"], "bbox": v["bbox"],
"marks": (1 if lab.endswith(".0") else None), "regions": []}
for lab, v in parts.items()}
# --- AQA text grammar (born-digital AQA papers, e.g. 7408 with a text layer) ------
AQA_MARK = re.compile(r"\[\s*(\d+)\s*marks?\s*\]", re.I)
# AQA boxed labels render SPACE-SEPARATED in pdftotext ("0 1 . 1"); decimal content
# ("10.5 N") does not. `pdftotext -bbox` normalises gaps to single spaces, while `-layout`
# preserved wider runs, so the top-box grammar tolerates either one-or-more spaces.
AQA_PART_BOX = re.compile(r"^\s*(\d)\s+(\d)\s*\.\s*(\d)(?=\s|$)") # 0 1 . 1
AQA_TOP_BOX = re.compile(r"^\s*(\d)\s+(\d)\s+(?=[A-Z(])") # 0 2 Carbon...
def aqa_questions_text(lines):
parts = {}
cur = None
for l in lines:
mp = AQA_PART_BOX.match(l.text)
if mp:
q = f"{mp.group(1)}{mp.group(2)}"
lab = f"{q}.{mp.group(3)}"
cur = parts.setdefault(lab, {"q": q, "page": l.page, "bbox": _prefix_bbox(l),
"marks": None, "regions": []})
else:
mt = AQA_TOP_BOX.match(l.text)
if mt:
q = f"{mt.group(1)}{mt.group(2)}"
cur = parts.setdefault(f"{q}.0", {"q": q, "page": l.page, "bbox": _prefix_bbox(l),
"marks": None, "regions": []})
mm = AQA_MARK.search(l.text)
if mm and cur is not None and cur.get("marks") is None:
cur["marks"] = int(mm.group(1))
# drop a placeholder ".0" part if the same question also has real numbered parts
for q in {v["q"] for v in parts.values()}:
if f"{q}.0" in parts and any(parts[k]["q"] == q and k != f"{q}.0" for k in parts):
parts.pop(f"{q}.0")
return parts
# ====================================================================== Edexcel
EDX_TOTAL = re.compile(r"Total for Question\s+(\d+)\s+is\s+(\d+)\s+marks?", re.I)
EDX_LEAD = re.compile(r"^\s*(\d{1,2})\s+(.*)$") # number, gap, then the rest of the line
EDX_PART = re.compile(r"\(([a-h])\)") # may appear inline after the number
EDX_SUB = re.compile(r"^\s*\(([ivx]{1,4})\)")
EDX_MARK = re.compile(r"^\s*\((\d+)\)\s*$")
def edexcel_questions(lines):
# anchor top-level numbers on the robust "Total for Question N is M" signal (precision)
anchors = {} # qnum -> (total marks, anchor line)
for l in lines:
m = EDX_TOTAL.search(l.text)
if m:
anchors[int(m.group(1))] = (int(m.group(2)), l)
parts = {}
haspart = set() # questions that own lettered parts
curq = curlet = lastlab = None
def add(lab, q, l):
nonlocal lastlab
parts.setdefault(lab, {"q": q, "page": l.page, "bbox": _prefix_bbox(l, 40), "marks": None, "regions": []})
lastlab = lab
for l in lines:
if EDX_TOTAL.search(l.text):
curq = curlet = None
continue
ml = EDX_LEAD.match(l.text)
if ml and int(ml.group(1)) in anchors and (ml.group(2)[:1].isupper()
or ml.group(2).lstrip().startswith("(")):
curq, rest = ml.group(1), ml.group(2)
curlet = None
inline = EDX_PART.search(rest) # capture "(a)" sharing the lead line
if inline:
curlet = inline.group(1); haspart.add(curq); add(f"{curq}{curlet}", curq, l)
continue
if curq is None:
continue
mp = EDX_PART.match(l.text.lstrip())
if mp:
curlet = mp.group(1); haspart.add(curq); add(f"{curq}{curlet}", curq, l)
ms = EDX_SUB.match(l.text)
if ms and curlet:
add(f"{curq}{curlet}{ms.group(1)}", curq, l)
mm = EDX_MARK.match(l.text)
if mm and lastlab:
parts[lastlab]["marks"] = int(mm.group(1))
# part-less questions: one part carrying the authoritative Total-for-Question mark
for q, (total, anchor_line) in anchors.items():
if str(q) not in haspart:
parts.setdefault(str(q), {"q": str(q), "page": anchor_line.page,
"bbox": _prefix_bbox(anchor_line, 40),
"marks": total, "regions": []})
return parts, {}, anchors
# ====================================================================== OCR
OCR_PART = re.compile(r"^\s*\(([a-h])\)")
OCR_SUB = re.compile(r"^\s*\(([ivx]{1,4})\)")
OCR_MARK = re.compile(r"\[(\d+)\]")
OCR_EXT = re.compile(r"^\s*\(([a-h])\)\s*\*|^\s*(\d{1,2})\s*\*")
def ocr_questions(lines):
parts = {}
curq = curlet = None
expect = 1
inferred = 0 # OCR may drop the margin question number; infer from part structure
for l in lines:
# top-level = the NEXT integer in sequence, gap, then question text OR a part opener "(a)"
# (Q3 opens straight into (a)). Sequence gate = the precision filter.
ml = re.match(r"^\s*(\d{1,2})\s+(\(|[A-Z])", l.text)
if ml and int(ml.group(1)) == expect:
curq = ml.group(1); curlet = None; expect += 1
parts.setdefault(curq, {"q": curq, "page": l.page, "bbox": _prefix_bbox(l, 36),
"marks": None, "regions": [], "_lead": True})
if curq is None:
# number was OCR-dropped: start an inferred question on its first part "(a)"
m0 = OCR_PART.match(l.text.lstrip())
if m0 and m0.group(1) == "a":
inferred += 1; curq = f"~{inferred}"; curlet = None
else:
continue
ext = bool(re.search(r"\(\s*[a-h]\s*\)\s*\*", l.text))
mp = OCR_PART.match(l.text)
if mp:
# a repeat "(a)" while this question already owns one => next question, number dropped
if mp.group(1) == "a" and f"{curq}a" in parts:
inferred += 1; curq = f"~{inferred}"
curlet = mp.group(1)
parts.pop(curq, None)
lab = f"{curq}{curlet}"
parts.setdefault(lab, {"q": curq, "page": l.page, "bbox": _prefix_bbox(l, 36),
"marks": None, "regions": [], "extended": ext})
ms = OCR_SUB.match(l.text)
if ms and curlet:
lab = f"{curq}{curlet}{ms.group(1)}"
parts.setdefault(lab, {"q": curq, "page": l.page, "bbox": _prefix_bbox(l, 36),
"marks": None, "regions": [], "extended": ext})
mm = OCR_MARK.search(l.text)
if mm:
sib = [k for k in parts if parts[k]["q"] == curq and not parts[k].get("_lead")]
if sib:
parts[sib[-1]]["marks"] = int(mm.group(1))
for v in parts.values():
v.pop("_lead", None)
return parts
# ====================================================================== shared layers
LABEL_TO_TAXONOMY = {
"checkbox_unselected": "mcq_option", "checkbox_selected": "mcq_option",
"picture": "context_figure", "table": "context_data", "caption": "context_caption",
"page_header": "furniture", "page_footer": "furniture",
"section_header": "heading", "list_item": "instruction",
}
def docling_regions(doc):
regions = []
for key in ("texts", "pictures", "tables"):
for it in doc.get(key, []):
lab = it.get("label", key[:-1])
tax = LABEL_TO_TAXONOMY.get(lab)
if not tax:
continue
prov = it.get("prov") or []
bb = prov[0].get("bbox") if prov else None
pg = prov[0].get("page_no") if prov else None
if bb is None:
continue
regions.append({"type": tax, "docling_label": lab, "page": pg, "bbox": bb,
"text": (it.get("text") or "")[:80]})
return regions
def merge_gemma(parts, gemma_dir):
"""Attach gemma4:e4b answer_regions (#3) to parts by for_part; gap-fill missing marks."""
n_reg = n_fill = 0
for fn in sorted(glob.glob(os.path.join(gemma_dir, "p*.json"))):
d = json.load(open(fn))
for r in d.get("answer_regions", []):
lab = _norm_label(r.get("for_part", ""))
if lab in parts:
parts[lab]["regions"].append({"type": r.get("kind", "answer_lines"),
"source": "gemma"})
n_reg += 1
for qp in d.get("question_parts", []):
lab = _norm_label(qp.get("label", ""))
if lab in parts and parts[lab].get("marks") is None and qp.get("marks") is not None:
parts[lab]["marks"] = qp["marks"]; n_fill += 1
return n_reg, n_fill
def _norm_label(s):
"""gemma sometimes emits '0_4' or '01_2' for AQA -> '01.4'/'01.2'."""
s = (s or "").strip().replace("_", ".")
m = re.match(r"^(\d)\.(\d)$", s)
if m: # '0.4' -> drop, ambiguous; keep as-is otherwise
return s
return s
def extract_tables(parts, doc, granite="off", pdf=None, cache_glob=None):
"""Selective table-cell extraction (PLAN.md §B): standard TableFormer grids always; Granite
<otsl> on router-flagged pages when granite!='off'. Returns (data_tables, all_tables).
Granite tables win over the standard pipeline on pages it covers (cleaner grids)."""
std = tbl_mod.tables_from_standard(doc)
gran = []
if granite != "off":
pages = tbl_mod.candidate_pages(doc)
if granite == "cached":
cache = tbl_mod._load_cached_doctags(cache_glob or "")
for pg in pages:
for t in tbl_mod.parse_otsl(cache.get(pg, "")):
t["page"] = pg; gran.append(t)
elif granite == "live" and pdf:
gran = tbl_mod.granite_tables(pdf, pages, cached_glob=cache_glob)
gran_pages = {t["page"] for t in gran}
combined = gran + [t for t in std if t["page"] not in gran_pages]
data = tbl_mod.attach_to_questions(combined, parts)
for v in parts.values():
if v.get("tables"):
v["has_table"] = True
return data, combined
def attach_marks_by_geometry(parts, doc):
"""AQA Docling path: marks live in a flat [N marks] list; bind each to the nearest
preceding part on the same page by vertical position."""
marks = []
for t in doc.get("texts", []):
prov = t.get("prov") or []
bb = prov[0].get("bbox") if prov else None
pg = prov[0].get("page_no") if prov else None
for m in AQA_MARK.finditer(t.get("text") or ""):
marks.append((pg, bb, int(m.group(1))))
by_page = defaultdict(list)
for lab, v in parts.items():
if v.get("page") is not None:
by_page[v["page"]].append((lab, v))
n = 0
for pg, bb, val in marks:
cands = by_page.get(pg, [])
if not cands or bb is None:
continue
my = (bb["t"] + bb["b"]) / 2
best = min(cands, key=lambda kv: abs(((kv[1]["bbox"] or {}).get("t", 0)
+ (kv[1]["bbox"] or {}).get("b", 0)) / 2 - my)
if kv[1].get("bbox") else 1e9)
if best[1].get("marks") is None:
best[1]["marks"] = val; n += 1
return n, marks
# ----------------------------------------------------------------- assembly + coverage
def build_questions(parts):
qs = defaultdict(list)
for lab in parts:
qs[parts[lab]["q"]].append(lab)
out = []
for q in sorted(qs, key=lambda x: (len(x), x)):
plist = sorted(qs[q])
out.append({
"question": q,
"parts": [{"label": lab, "page": parts[lab].get("page"),
"bbox": parts[lab].get("bbox"), # label geometry (None for born-digital text)
"marks": parts[lab].get("marks"),
"regions": parts[lab].get("regions", []),
"has_table": parts[lab].get("has_table", False),
"extended": parts[lab].get("extended", False)} for lab in plist],
})
return out
GT_PARTS_PHYSICS = ["01.1","01.2","01.3","01.4","02.1","02.2","02.3","02.4","03.1","03.2","03.3","03.4",
"04.1","04.2","04.3","04.4","04.5","05.1","05.2","05.3","05.4","06.1","06.2","06.3",
"07.1","07.2","07.3","08.1","08.2","08.3","08.4","08.5","09.1","09.2","09.3",
"10.1","10.2","10.3","11.1","11.2","11.3","11.4"]
# official paper maxima — the strongest grammar sanity check (marks_sum should match)
EXPECTED_MAX = {"8463": 100, "7408": 85, "8461": 100, "1MA1": 80, "H556": 70}
def expected_max(code):
if not code:
return None
for k, v in EXPECTED_MAX.items():
if code.startswith(k):
return v
return None
def parse_text_by_board(lines, board):
"""Run the board grammar over a line stream -> parts dict (used for GT + born-digital)."""
if board == "edexcel":
parts, _, _ = edexcel_questions(lines); return parts
if board == "ocr":
return ocr_questions(lines)
return aqa_questions_text(lines)
def coverage(parts, gt_labels):
rec = set(parts)
hit = sorted(rec & set(gt_labels))
miss = sorted(set(gt_labels) - rec)
return {"coverage_pct": round(len(hit) / len(gt_labels) * 100, 1) if gt_labels else None,
"recovered": len(hit), "total": len(gt_labels), "missed": miss}
# ----------------------------------------------------------------- main
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--text", help="pdftotext source or .txt (born-digital / GT path)")
ap.add_argument("--auto", help="PDF: auto-detect text layer -> fast-path (--text), else "
"report the OCR path is required (no GPU work attempted here)")
ap.add_argument("--ocr", help="PDF to OCR live via dsync (uses the shared GPU)")
ap.add_argument("--docling", help="cached Docling JSON (OCR path without re-running dsync)")
ap.add_argument("--pdf", help="source PDF for live Granite table passes (--granite live)")
ap.add_argument("--rapid", help="AQA RapidOCR per-page glob (the v1 95% path)")
ap.add_argument("--gemma", help="gemma sweep dir with p*.json answer_regions")
ap.add_argument("--marks-fill", dest="marks_fill",
help="gemma_marks.py fills JSON: fill marks=None parts (Edexcel/OCR (N)/[N] gap-fill)")
ap.add_argument("--granite", default="off", choices=["off", "cached", "live"],
help="selective Granite-Docling <otsl> tables: cached doctags or live via dsync")
ap.add_argument("--granite-cache", default="results/VLM_granite_p*.doctags",
help="glob of cached *.doctags for --granite cached / live fallback")
ap.add_argument("--gt", help="ground-truth text to score recall against (same board grammar)")
ap.add_argument("--board", default="auto", choices=["auto", "aqa", "edexcel", "ocr"])
ap.add_argument("--out", default="results/structured.json")
a = ap.parse_args()
# --- auto path selection -------------------------------------------------------------
# Caller need not know in advance whether the PDF is born-digital or image-only: detect the
# text layer and either fold --auto into the fast-path (--text) or report that the OCR path
# is required. This is ADDITIVE — it only resolves --auto; every other mode is untouched.
if a.auto:
chars, n_pages, cpp = text_layer_chars_per_page(a.auto)
if cpp >= TEXT_LAYER_MIN_CHARS_PER_PAGE:
print(f"auto-detect : born-digital text layer "
f"({chars} chars / {n_pages} pages = {cpp:.0f} chars/page "
f">= {TEXT_LAYER_MIN_CHARS_PER_PAGE}) -> fast-path (pdftotext, no GPU)")
a.text = a.auto
else:
print(f"auto-detect : NO usable text layer "
f"({chars} chars / {n_pages} pages = {cpp:.1f} chars/page "
f"< {TEXT_LAYER_MIN_CHARS_PER_PAGE}) -> OCR path required")
print("route : run the OCR pipeline, e.g.")
print(f" python extract.py --ocr {a.auto}")
print(" (AQA image-only papers use the RapidOCR margin-pass; "
"see scripts/rapid_pass.py)")
return
# default invocation == v1 AQA physics regression guard
if not (a.text or a.ocr or a.docling):
a.docling = "results/E_tess_full.json"
a.rapid = a.rapid or "results/rapid_pages/p*.json"
a.gemma = a.gemma or "results/gemma_sweep_physics_200"
a.pdf = a.pdf or "samples/AQA-Physics-Paper-1H-2022-with-qr.pdf"
a.out = "results/physics_structured.json" if a.out == "results/structured.json" else a.out
doc = None
pages = []
if a.ocr:
try:
from . import dsync
except ImportError: # pragma: no cover - CLI execution
import dsync
doc = dsync.convert_document(a.ocr, {"ocr_engine": "tesseract", "force_ocr": True})
lines = lines_from_docling(doc)
elif a.docling:
doc = json.load(open(a.docling))
lines = lines_from_docling(doc)
else:
if a.text and a.text.endswith(".pdf"):
lines, pages = _bbox_lines_from_pdftotext(a.text)
else:
lines = lines_from_pdftext(a.text)
board, code = detect_board(lines)
if a.board != "auto":
board = a.board
fm = extract_front_matter(lines, board, code)
# --- questions: AQA uses the proven Docling+RapidOCR path when inputs exist ----------
if board == "aqa" and a.rapid and glob.glob(a.rapid):
parts = aqa_questions_rapid(a.rapid)
path_used = "aqa-docling+rapidocr (v1)"
else:
parts = parse_text_by_board(lines, board)
path_used = f"{board}-text-grammar"
# --- shared enrichment ---------------------------------------------------------------
regions = docling_regions(doc) if doc else []
n_mark_geo = 0
if doc and board == "aqa":
n_mark_geo, _ = attach_marks_by_geometry(parts, doc)
data_tables, all_tables = ([], [])
if doc:
data_tables, all_tables = extract_tables(parts, doc, granite=a.granite,
pdf=(a.pdf or a.ocr), cache_glob=a.granite_cache)
n_tbl = sum(1 for v in parts.values() if v.get("has_table"))
tbl_pages = sorted({t["page"] for t in data_tables if t["page"]})
n_reg = n_fill = 0
if a.gemma and os.path.isdir(a.gemma):
n_reg, n_fill = merge_gemma(parts, a.gemma)
n_marks_fill = 0
if a.marks_fill and os.path.exists(a.marks_fill):
fills = json.load(open(a.marks_fill)).get("fills", {})
for lab, mk in fills.items():
if lab in parts and parts[lab].get("marks") is None:
parts[lab]["marks"] = int(mk); n_marks_fill += 1
questions = build_questions(parts)
# --- coverage ------------------------------------------------------------------------
if a.gt:
gt_lines = lines_from_pdftext(a.gt)
gt_parts = parse_text_by_board(gt_lines, board)
cov = coverage(parts, list(gt_parts))
cov["source"] = "gt-text-same-grammar"
elif board == "aqa" and "rapidocr" in path_used:
cov = coverage(parts, GT_PARTS_PHYSICS)
cov["source"] = "builtin-physics-gt"
else:
cov = {"coverage_pct": None, "note": "no GT provided"}
marks_known = sum(1 for v in parts.values() if v.get("marks") is not None)
marks_sum = sum(v["marks"] for v in parts.values() if v.get("marks") is not None)
exp_max = expected_max(code) or fm.get("max_marks") # code-based, else front-matter total
marks_check = (None if exp_max is None else
{"sum": marks_sum, "expected_max": exp_max,
"pct": round(marks_sum / exp_max * 100, 1)})
result = {
"board": board, "paper_code": code, "front_matter": fm, "path": path_used,
"pages": pages,
"questions": questions,
"regions": regions,
"tables": data_tables,
"stats": {
"n_questions": len({v["q"] for v in parts.values()}),
"n_parts": len(parts),
"marks_parts_known": marks_known, "marks_sum": marks_sum,
"marks_check": marks_check,
"gemma_answer_regions": n_reg, "gemma_marks_filled": n_fill,
"gemma_marks_gapfilled": n_marks_fill,
"n_data_tables": len(data_tables),
"n_furniture_tables": sum(1 for t in all_tables if t["is_furniture"]),
"table_sources": {s: sum(1 for t in data_tables if t["source"] == s)
for s in sorted({t["source"] for t in data_tables})},
"table_pages": tbl_pages,
"region_type_counts": {t: sum(1 for r in regions if r["type"] == t)
for t in sorted({r["type"] for r in regions})},
},
"coverage": cov,
}
json.dump(result, open(a.out, "w"), indent=2)
print(f"board : {board} ({code or 'wordmark'}) [{path_used}]")
print(f"front-matter : {', '.join(f'{k}={v}' for k,v in fm.items() if not isinstance(v,(list,dict)))}")
print(f"questions : {result['stats']['n_questions']} top-level, {len(parts)} parts")
mc = f" | {marks_sum}/{exp_max} of max ({marks_check['pct']}%)" if marks_check else ""
print(f"marks : {marks_known}/{len(parts)} parts known (sum {marks_sum}){mc}"
+ (f"; +{n_mark_geo} by geometry" if n_mark_geo else ""))
print(f"gemma regions : {n_reg} answer_regions, {n_fill} marks gap-filled"
+ (f"; +{n_marks_fill} marks via --marks-fill" if n_marks_fill else ""))
print(f"tables : {len(data_tables)} data table(s) "
f"{result['stats']['table_sources']} on pages {tbl_pages}; "
f"{result['stats']['n_furniture_tables']} furniture filtered; {n_tbl} parts flagged")
if cov.get("coverage_pct") is not None:
print(f"COVERAGE : {cov['coverage_pct']}% ({cov['recovered']}/{cov['total']})"
f" missed: {cov['missed'][:8]}{'' if len(cov['missed'])>8 else ''} [{cov['source']}]")
print(f"-> wrote {a.out}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,248 @@
#!/usr/bin/env python3
"""
finalize.py produce the final corpus output bundle under results/final/.
Runs the full pipeline (via the real module CLIs, so the bundle is reproducible) across the corpus:
* geometry papers (image-only / OCR-path): structured + furniture + bands + page_roles + template
+ validate + overlays (template human-review view for ALL pages, rich debug for sample pages).
* born-digital fast-path papers: structured + validate (no geometry -> no overlays).
Writes per-paper report.md, a human INDEX.md, and a machine catalog.json.
Usage:
python finalize.py [--no-overlays] # --no-overlays = JSON pipeline only (fast)
"""
import os, sys, glob, json, subprocess, argparse, datetime
FINAL = "results/final"
PY = sys.executable
# ------------------------------------------------------------------ corpus manifest
GEOMETRY = [
dict(slug="aqa-physics-8463-imageonly", title="AQA GCSE Physics 8463/1H (image-only)",
board="aqa", level="GCSE", path="image-only (RapidOCR margin-pass)",
pdf="samples/AQA-Physics-Paper-1H-2022-with-qr.pdf",
docling="results/E_tess_full.json", rapid="results/rapid_pages/p*.json",
extract=["--docling", "results/E_tess_full.json", "--rapid", "results/rapid_pages/p*.json",
"--granite", "cached"]),
dict(slug="aqa-physics-7408-ocr", title="AQA A-level Physics 7408/1 (rasterised OCR)",
board="aqa", level="A-level", path="OCR (RapidOCR margin-pass + Section-B MCQ)",
pdf="samples/extra/aqa-alevel-physics-7408-1-jun22-qp.pdf",
docling="results/rapid_7408/merged.json", rapid="results/rapid_7408/p*.json",
gt="results/gt_extra/aqa-alevel-physics-7408-1-jun22-qp.txt",
extract=["--docling", "results/rapid_7408/merged.json", "--rapid", "results/rapid_7408/p*.json",
"--board", "aqa"]),
dict(slug="aqa-biology-8461-ocr", title="AQA GCSE Biology 8461/1H (rasterised OCR)",
board="aqa", level="GCSE", path="OCR (RapidOCR margin-pass)",
pdf="samples/extra/aqa-gcse-biology-8461-1h-jun22-qp.pdf",
docling="results/rapid_8461/merged.json", rapid="results/rapid_8461/p*.json",
gt="results/gt_extra/aqa-gcse-biology-8461-1h-jun22-qp.txt",
extract=["--docling", "results/rapid_8461/merged.json", "--rapid", "results/rapid_8461/p*.json",
"--board", "aqa"]),
dict(slug="edexcel-maths-1ma1-1h-ocr", title="Edexcel GCSE Maths 1MA1/1H (rasterised OCR)",
board="edexcel", level="GCSE-H", path="OCR + gemma marks gap-fill",
pdf="samples/extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.pdf",
docling="results/genreport/edexcel1h/ocr.json", rapid=None,
gt="results/gt_extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.txt",
extract=["--docling", "results/genreport/edexcel1h/ocr.json", "--board", "edexcel",
"--marks-fill", "results/genreport/edexcel1h/marks_fill.json"]),
dict(slug="edexcel-maths-1ma1-1f-ocr", title="Edexcel GCSE Maths 1MA1/1F (rasterised OCR)",
board="edexcel", level="GCSE-F", path="OCR + gemma marks gap-fill",
pdf="samples/extra/edexcel-gcse-maths-1ma1-1f-jun22-qp.pdf",
docling="results/genreport/edexcel1f/ocr.json", rapid=None,
extract=["--docling", "results/genreport/edexcel1f/ocr.json", "--board", "edexcel",
"--marks-fill", "results/genreport/edexcel1f/marks_fill.json"]),
dict(slug="ocr-physics-h556-ocr", title="OCR A-level Physics H556/3 (rasterised OCR)",
board="ocr", level="A-level", path="OCR + gemma marks gap-fill",
pdf="samples/extra/ocr-alevel-physics-h556-3-jun22-qp.pdf",
docling="results/genreport/ocrh556/ocr.json", rapid=None,
gt="results/gt_extra/ocr-alevel-physics-h556-3-jun22-qp.txt",
extract=["--docling", "results/genreport/ocrh556/ocr.json", "--board", "ocr",
"--marks-fill", "results/genreport/ocrh556/marks_fill.json"]),
]
FAST = [
dict(slug="aqa-physics-7408-fast", title="AQA A-level Physics 7408/1 (born-digital)", board="aqa",
level="A-level", pdf="samples/extra/aqa-alevel-physics-7408-1-jun22-qp.pdf",
gt="results/gt_extra/aqa-alevel-physics-7408-1-jun22-qp.txt"),
dict(slug="aqa-biology-8461-fast", title="AQA GCSE Biology 8461/1H (born-digital)", board="aqa",
level="GCSE", pdf="samples/extra/aqa-gcse-biology-8461-1h-jun22-qp.pdf",
gt="results/gt_extra/aqa-gcse-biology-8461-1h-jun22-qp.txt"),
dict(slug="edexcel-maths-1ma1-1h-fast", title="Edexcel GCSE Maths 1MA1/1H (born-digital)",
board="edexcel", level="GCSE-H", pdf="samples/extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.pdf",
gt="results/gt_extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.txt"),
dict(slug="edexcel-maths-1ma1-1f-fast", title="Edexcel GCSE Maths 1MA1/1F (born-digital)",
board="edexcel", level="GCSE-F", pdf="samples/extra/edexcel-gcse-maths-1ma1-1f-jun22-qp.pdf"),
dict(slug="ocr-physics-h556-fast", title="OCR A-level Physics H556/3 (born-digital)", board="ocr",
level="A-level", pdf="samples/extra/ocr-alevel-physics-h556-3-jun22-qp.pdf",
gt="results/gt_extra/ocr-alevel-physics-h556-3-jun22-qp.txt"),
dict(slug="aqa-chemistry-8462-fast", title="AQA GCSE Chemistry 8462/1H (born-digital)", board="aqa",
level="GCSE", pdf="samples/chemistry-p1h-2023-qp.pdf"),
dict(slug="aqa-physics-8463-twin-fast", title="AQA GCSE Physics 8463/1H born-digital twin",
board="aqa", level="GCSE", pdf="samples/physics-p1h-2022-qp.pdf"),
]
def run(cmd):
r = subprocess.run([PY] + cmd, capture_output=True, text=True)
if r.returncode != 0:
print(f" ! FAILED: {' '.join(cmd)}\n{r.stderr[-400:]}")
return r.returncode == 0
def jload(p):
try:
return json.load(open(p))
except Exception:
return {}
def stats_from(struct, val):
st = struct.get("stats", {}) or {}
mc = st.get("marks_check") or {}
cov = struct.get("coverage", {}) or {}
return {
"board": struct.get("board"), "paper_code": struct.get("paper_code"),
"n_questions": st.get("n_questions"), "n_parts": st.get("n_parts"),
"marks_sum": mc.get("sum"), "official_max": mc.get("expected_max"),
"marks_pct": mc.get("pct"),
"coverage_pct": cov.get("coverage_pct"), "coverage_missed": cov.get("missed", []),
"validate_verdict": (val.get("summary") or {}).get("worst_severity"),
"validate_flags": val.get("flags", []),
"questions_expected": (val.get("summary") or {}).get("questions_expected"),
"questions_recovered": (val.get("summary") or {}).get("questions_recovered"),
"second_pass_slots": [q["label"] for q in val.get("question_sequence", []) if not q["recovered"]],
}
def do_geometry(p, overlays):
d = os.path.join(FINAL, p["slug"]); os.makedirs(d, exist_ok=True)
S, F, B, R, T, V = (os.path.join(d, f) for f in
("structured.json", "furniture.json", "bands.json", "page_roles.json",
"template.json", "validate.json"))
ex = ["extract.py"] + p["extract"] + ["--out", S]
if p.get("gt"):
ex += ["--gt", p["gt"]]
run(ex)
run(["furniture.py", p["docling"], "--out", F])
bands = ["bands.py", S, "--docling", p["docling"], "--out", B]
if p.get("rapid"):
bands += ["--rapid", p["rapid"]]
run(bands)
run(["page_roles.py", p["docling"], "--bands", B, "--out", R])
run(["template.py", "--structured", S, "--bands", B, "--furniture", F,
"--page-roles", R, "--pdf", p["pdf"], "--out", T])
run(["validate.py", S, "--out", V])
if overlays:
otpl = os.path.join(d, "overlays", "template")
run(["scripts/overlay.py", S, p["pdf"], "--template", T, "--dpi", "120", "--out", otpl])
# rich debug view on the first few pages (cover + early questions)
odbg = os.path.join(d, "overlays", "debug")
run(["scripts/overlay.py", S, p["pdf"], "--docling", p["docling"], "--bands", B,
"--furniture", F, "--pages", "1,2,3,4,5", "--dpi", "120", "--out", odbg])
return stats_from(jload(S), jload(V)), d
def do_fast(p):
d = os.path.join(FINAL, p["slug"]); os.makedirs(d, exist_ok=True)
S = os.path.join(d, "structured.json"); V = os.path.join(d, "validate.json")
ex = ["extract.py", "--text", p["pdf"], "--out", S]
if p.get("gt"):
ex += ["--gt", p["gt"]]
run(ex)
run(["validate.py", S, "--out", V])
return stats_from(jload(S), jload(V)), d
def per_paper_report(p, s, d, kind):
n_imgs = len(glob.glob(os.path.join(d, "overlays", "**", "*.png"), recursive=True))
lines = [f"# {p['title']}", "",
f"- **slug:** `{p['slug']}` · **board:** {p['board']} · **level:** {p['level']} "
f"· **path:** {kind}",
f"- **questions/parts:** {s['n_questions']} / {s['n_parts']}",
f"- **marks:** {s['marks_sum']}/{s['official_max']}"
+ (f" ({s['marks_pct']}% of official max)" if s['marks_pct'] is not None else ""),
f"- **coverage vs GT:** {s['coverage_pct']}%"
+ (f" (missed {s['coverage_missed'][:8]})" if s.get('coverage_missed') else "")
if s['coverage_pct'] is not None else "- **coverage vs GT:** n/a",
f"- **G6 verdict:** {s['validate_verdict']}",
]
if s["validate_flags"]:
lines += ["", "**Flags (human-review hints):**"] + [f"- {f}" for f in s["validate_flags"]]
lines += ["", "**Artifacts:** `structured.json`, `validate.json`"
+ (", `furniture.json`, `bands.json`, `page_roles.json`, `template.json`, "
f"`overlays/` ({n_imgs} images)" if kind != "born-digital fast-path"
else " (born-digital: no page geometry → no overlays)")]
open(os.path.join(d, "report.md"), "w").write("\n".join(lines) + "\n")
return n_imgs
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--no-overlays", action="store_true")
a = ap.parse_args()
os.makedirs(FINAL, exist_ok=True)
catalog = {"generated_at": datetime.datetime.now().isoformat(timespec="seconds"),
"papers": []}
total_imgs = 0
for p in GEOMETRY:
print(f"[geometry] {p['slug']}")
s, d = do_geometry(p, not a.no_overlays)
n = per_paper_report(p, s, d, p["path"])
total_imgs += n
catalog["papers"].append({**{k: p[k] for k in ("slug", "title", "board", "level")},
"kind": "geometry", "path": p["path"], "dir": d,
"overlay_images": n, **s})
for p in FAST:
print(f"[fast] {p['slug']}")
s, d = do_fast(p)
per_paper_report(p, s, d, "born-digital fast-path")
catalog["papers"].append({**{k: p[k] for k in ("slug", "title", "board", "level")},
"kind": "fast", "path": "born-digital fast-path", "dir": d, **s})
json.dump(catalog, open(os.path.join(FINAL, "catalog.json"), "w"), indent=2)
write_index(catalog, total_imgs)
print(f"\n-> {len(catalog['papers'])} papers, {total_imgs} overlay images -> {FINAL}/")
def write_index(catalog, total_imgs):
g = [p for p in catalog["papers"] if p["kind"] == "geometry"]
f = [p for p in catalog["papers"] if p["kind"] == "fast"]
L = ["# Final corpus output — exam-extraction spike", "",
f"Generated {catalog['generated_at']}. {len(catalog['papers'])} paper-runs across "
f"3 boards × 2 levels, both pipeline paths; {total_imgs} overlay debug images.", "",
"Each `<slug>/` holds the machine artifacts (JSON) + `report.md`; geometry papers also have "
"`overlays/template/` (human-review view, all pages) and `overlays/debug/` (raw-detection view).",
"Machine catalog: `catalog.json`.", "",
"## Image-only / OCR-path (with geometry + overlays)", "",
"| Paper | Board / level | Q/parts | Marks/max | Coverage | G6 | Images |",
"|---|---|---|---|---|---|---|"]
for p in g:
cov = f"{p['coverage_pct']}%" if p['coverage_pct'] is not None else "n/a"
L.append(f"| [{p['title']}]({p['slug']}/report.md) | {p['board']} {p['level']} | "
f"{p['n_questions']}/{p['n_parts']} | {p['marks_sum']}/{p['official_max']} "
f"({p['marks_pct']}%) | {cov} | {p['validate_verdict']} | "
f"{p['overlay_images']} |")
L += ["", "## Born-digital fast-path (CPU, no geometry)", "",
"| Paper | Board / level | Q/parts | Marks/max | Coverage | G6 |",
"|---|---|---|---|---|---|"]
for p in f:
L.append(f"| [{p['title']}]({p['slug']}/report.md) | {p['board']} {p['level']} | "
f"{p['n_questions']}/{p['n_parts']} | {p['marks_sum']}/{p['official_max']} "
f"({p['marks_pct']}%) | {p['coverage_pct'] if p['coverage_pct'] is not None else 'n/a'}% | "
f"{p['validate_verdict']} |")
L += ["", "## Per-paper directory layout", "```",
"<slug>/",
" structured.json extract.py output (questions->parts->marks/bbox/regions)",
" validate.json G6 consistency judge (confidence + flags)",
" furniture.json recurring-furniture mask + content margins [geometry only]",
" bands.json main + part y-bands [geometry only]",
" page_roles.json per-page role + margin override [geometry only]",
" template.json editable first-pass template (source/confirmed) [geometry only]",
" overlays/template/ human-review view, all pages [geometry only]",
" overlays/debug/ raw-detection view, sample pages [geometry only]",
" report.md per-paper human summary", "```"]
open(os.path.join(FINAL, "INDEX.md"), "w").write("\n".join(L) + "\n")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""
furniture.py detect recurring page chrome by cross-page repetition; derive content margins;
reclassify pictures (real figure vs barcode/QR/header furniture). The first-pass mask.
Principle: an item at ~the same (x,y) on many pages is **chrome, not question content**. This
needs no classifier pure positional recurrence and it solves the genuine gap the overlay
surfaced (the app-generated QR top-right and the foot barcode being mislabelled context_figure),
including the QR that bleeds past the margin. It also yields the content margins so stage-2 analysis
can be fed only the question/response region.
Outputs a mask + margins JSON, and an A/B summary (figure false-positives before vs after masking).
Usage:
python furniture.py <docling_doc.json> [--freq 0.4] [--out results/furniture.json]
"""
import json, argparse
from collections import defaultdict
GRID = 24 # pt — position quantisation; items sharing a cell across pages are "recurring"
def gather(doc):
out = []
for key in ("texts", "pictures", "tables"):
for it in doc.get(key, []):
prov = it.get("prov") or []
bb = prov[0].get("bbox") if prov else None
pg = prov[0].get("page_no") if prov else None
if bb and pg:
out.append({"page": pg, "kind": key[:-1], "label": it.get("label", key[:-1]),
"bbox": bb, "text": (it.get("text") or "")[:40]})
return out
def cell(bb):
return (round((bb["l"] + bb["r"]) / 2 / GRID), round((bb["t"] + bb["b"]) / 2 / GRID))
def detect(items, n_pages, freq):
"""Flag each item furniture=True if its position-cell appears on >= freq*n_pages pages."""
pages_at = defaultdict(set)
for it in items:
pages_at[cell(it["bbox"])].add(it["page"])
fcells = {c: len(p) for c, p in pages_at.items() if len(p) >= freq * n_pages}
for it in items:
it["furniture"] = cell(it["bbox"]) in fcells
return fcells
def content_margins(items):
"""Content x-band + per-page content bbox from NON-furniture items (what stage-2 should see)."""
body = [it for it in items if not it["furniture"]]
if not body:
return None
lefts = sorted(it["bbox"]["l"] for it in body)
rights = sorted(it["bbox"]["r"] for it in body)
band = {"x_left": round(lefts[max(0, len(lefts) // 20)], 1), # 5th pct — robust to strays
"x_right": round(rights[min(len(rights) - 1, len(rights) * 19 // 20)], 1)}
per_page = {}
bp = defaultdict(list)
for it in body:
bp[it["page"]].append(it["bbox"])
for pg, bbs in bp.items():
per_page[pg] = {"top": round(max(b["t"] for b in bbs), 1),
"bottom": round(min(b["b"] for b in bbs), 1),
"left": round(min(b["l"] for b in bbs), 1),
"right": round(max(b["r"] for b in bbs), 1)}
return {"content_x_band": band, "per_page": per_page}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("doc")
ap.add_argument("--freq", type=float, default=0.40, help="recurrence fraction => furniture")
ap.add_argument("--out", default="results/furniture.json")
a = ap.parse_args()
doc = json.load(open(a.doc))
items = gather(doc)
n_pages = len({it["page"] for it in items})
fcells = detect(items, n_pages, a.freq)
margins = content_margins(items)
pics = [it for it in items if it["kind"] == "picture"]
pics_furn = [it for it in pics if it["furniture"]]
txt_furn = [it for it in items if it["kind"] == "text" and it["furniture"]]
# break furniture pictures down by cell (which recurring object)
by_cell = defaultdict(list)
for it in pics_furn:
by_cell[cell(it["bbox"])].append(it)
result = {
"n_pages": n_pages, "freq_threshold": a.freq,
"furniture_cells": {f"{c[0]},{c[1]}": n for c, n in sorted(fcells.items())},
"content_margins": margins,
"ab_test_figures": {
"context_figure_before_mask": len(pics),
"context_figure_after_mask": len(pics) - len(pics_furn),
"removed_as_furniture": len(pics_furn),
"removed_breakdown": {f"cell {c[0]},{c[1]}": len(v) for c, v in sorted(by_cell.items())},
},
"text_furniture_removed": len(txt_furn),
"items": items, # each carries furniture flag — consumed by overlay.py --furniture
}
json.dump(result, open(a.out, "w"))
ab = result["ab_test_figures"]
print(f"pages {n_pages} freq>={a.freq} furniture cells: {result['furniture_cells']}")
print(f"content x-band: {margins['content_x_band'] if margins else None}")
print(f"\nA/B — figure (picture) classification:")
print(f" context_figure BEFORE mask : {ab['context_figure_before_mask']}")
print(f" context_figure AFTER mask : {ab['context_figure_after_mask']}")
print(f" removed as furniture : {ab['removed_as_furniture']} {ab['removed_breakdown']}")
print(f" text furniture removed : {result['text_furniture_removed']} (page numbers / 'Turn over' / headers)")
print(f"-> wrote {a.out}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""
page_roles.py tag every page with a structural role (the first-pass page-layout pass).
Roles: cover / question / continuation / blank / appendix. Drives two things in the template:
* the human sees the paper's shape (which pages are non-question), and
* MARGINS are disabled on pages that have no content column (cover, blank) the override the
user asked for ("the front page doesn't have margins").
Signals (deterministic, no GPU): per-page non-space char count, cover/boilerplate keywords, and
whether the page carries a question band. Output feeds template.py via --page-roles.
Usage:
python page_roles.py <docling_doc.json> --bands <bands.json> [--out results/page_roles/x.json]
"""
import json, argparse
from collections import defaultdict
BLANK_MAX = 130 # non-space chars at/below which a page is boilerplate-only (blank)
COVER_KW = ("time allowed", "instructions", "materials", "information for")
BLANK_KW = ("blank page", "no questions printed", "no questions are printed")
APPENDIX_KW = ("data sheet", "formula", "periodic table", "insert", "resource booklet")
# pages where there is no content column -> margins do not apply (the user's override case)
NO_MARGIN_ROLES = {"cover", "blank"}
def page_text(doc):
chars, blob = defaultdict(int), defaultdict(list)
for t in doc.get("texts", []):
prov = t.get("prov") or []
pg = prov[0].get("page_no") if prov else None
if pg:
s = t.get("text") or ""
chars[pg] += sum(1 for c in s if not c.isspace())
blob[pg].append(s.lower())
return chars, {pg: " ".join(v) for pg, v in blob.items()}
def tag(doc, qpages):
chars, blob = page_text(doc)
n = max([*chars, *qpages, 1])
first_q = min(qpages) if qpages else n + 1
last_q = max(qpages) if qpages else 0
roles = {}
for pg in range(1, n + 1):
b = blob.get(pg, "")
if pg in qpages:
role = "question"
elif pg < first_q and any(k in b for k in COVER_KW):
role = "cover" # before blank: the cover's instructions mention "blank"
elif chars[pg] <= BLANK_MAX or (any(k in b for k in BLANK_KW) and chars[pg] < 300):
role = "blank"
elif any(k in b for k in APPENDIX_KW):
role = "appendix"
elif first_q <= pg <= last_q:
role = "continuation" # no question label but inside the question range
else:
role = "appendix" # content outside the question range (end-matter/insert)
roles[pg] = {"role": role, "chars": chars[pg],
"margins_enabled": role not in NO_MARGIN_ROLES,
"source": "auto", "confirmed": False}
return roles
def main():
ap = argparse.ArgumentParser()
ap.add_argument("doc")
ap.add_argument("--bands", required=True)
ap.add_argument("--out", default="results/page_roles.json")
a = ap.parse_args()
bands = json.load(open(a.bands))
qpages = {int(p) for p in bands["pages"]}
roles = tag(json.load(open(a.doc)), qpages)
json.dump({"pages": roles}, open(a.out, "w"), indent=2)
from collections import Counter
c = Counter(v["role"] for v in roles.values())
print(f"roles: {dict(c)}")
for pg in sorted(roles):
r = roles[pg]
flag = "" if r["margins_enabled"] else " (no margins)"
if r["role"] != "question":
print(f" p{pg:2d}: {r['role']:12s} chars={r['chars']}{flag}")
print(f"-> wrote {a.out}")
if __name__ == "__main__":
main()

View File

View File

@ -0,0 +1,310 @@
#!/usr/bin/env python3
"""
overlay.py human-viewable debug visualisation: draw the extractor's geometry over the rendered
exam page. Shows WHERE each question/part label was located and where Docling regions
(figures/tables/MCQ checkboxes) sit, so a reviewer can eyeball whether the structure landed in the
right place. This is the same geometry the exam-marker app uses to place regions on its canvas.
Coordinates: Docling/RapidOCR bboxes are PDF points with a BOTTOM-LEFT origin. We render the page
at DPI D (scale = D/72) and flip y against the rendered image height, so we never need the page's
point-height explicitly: y_top_px = H_px - t*scale.
With --docling, also draws every raw Docling text block (the body/question content the thin
extractor model discards) so a reviewer can see the FULL detection, not just what we persist.
Granite tables carry cells but no coordinates; we derive their box by locating the cell-texts in
the Docling text layer (content+geometry fusion).
Usage:
python scripts/overlay.py <structured.json> <source_pdf> [--pages 3,4,5] [--dpi 150] [--out DIR]
python scripts/overlay.py <structured.json> <pdf> --docling results/E_tess_full.json --pages 5
"""
import os, sys, json, re, argparse, subprocess, tempfile
from PIL import Image, ImageDraw, ImageFont
PART_COLOR = (211, 47, 47) # red — question/part labels
BODY_COLOR = (150, 150, 150) # grey — raw Docling body-text blocks (--docling)
GRANITE_COLOR = (0, 150, 136) # teal — Granite table (geometry derived from cells)
REGION_COLORS = { # docling region taxonomy -> colour
"context_figure": (25, 118, 210), # blue
"context_data": (56, 142, 60), # green (tables)
"context_caption": (123, 31, 162), # purple
"mcq_option": (245, 124, 0), # orange (checkboxes)
}
def _norm(s):
return re.sub(r"[^a-z0-9]", "", (s or "").lower())
def docling_texts_by_page(doc):
"""All raw Docling text items -> {page: [(bbox, text, label)]}. The body content we discard."""
out = {}
for t in doc.get("texts", []):
prov = t.get("prov") or []
bb = prov[0].get("bbox") if prov else None
pg = prov[0].get("page_no") if prov else None
if bb and pg:
out.setdefault(pg, []).append((bb, t.get("text") or "", t.get("label") or "text"))
return out
def derive_table_bbox(grid, page_texts):
"""Granite tables have cells but no coordinates. Locate the cell-texts in the Docling text
layer and union their bboxes -> the table's on-page extent.
Two traps (seen on physics p5): (1) border/maths glyphs ('|','+') normalise to '' and an
empty string is a substring of everything; (2) cell WORDS recur in nearby content the rock
names reappear in the MCQ options below the table ('Basalt or chalk'), far left and lower.
So we match only blocks whose normalised text is CONTAINED IN a cell (keeps fragments like
'2.90'/'Type', rejects the longer 'basaltorchalk'), require length >= 2, then keep the
dominant vertical cluster to drop any stray cell-word elsewhere on the page."""
import statistics
cells = {c for c in (_norm(x) for row in grid for x in row) if len(c) > 1}
hit = [bb for bb, txt, _ in page_texts
if len(_norm(txt)) > 1 and any(_norm(txt) in c for c in cells)]
if len(hit) < 3:
return None
med = statistics.median(sorted((b["t"] + b["b"]) / 2 for b in hit))
hit = [b for b in hit if abs((b["t"] + b["b"]) / 2 - med) <= 120] # table band only
return {"l": min(b["l"] for b in hit), "r": max(b["r"] for b in hit),
"t": max(b["t"] for b in hit), "b": min(b["b"] for b in hit)}
def _font(sz):
for p in ("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"):
if os.path.exists(p):
return ImageFont.truetype(p, sz)
return ImageFont.load_default()
MAIN_LINE = (25, 118, 210) # blue — main-question y-markers
PART_LINE = (211, 47, 47) # red — part y-markers
def _hline(draw, y_pdf, scale, H, W, color, label, width, font, dashed=False, inset=0):
"""Full-width horizontal marker line at a PDF-point y (BOTTOM-LEFT origin)."""
y = H - y_pdf * scale
if dashed:
x = inset
while x < W:
draw.line([x, y, min(x + 9, W), y], fill=color, width=width); x += 16
else:
draw.line([inset, y, W, y], fill=color, width=width)
if label:
tw = draw.textlength(label, font=font)
draw.rectangle([inset, y - 16, inset + tw + 6, y], fill=color)
draw.text((inset + 3, y - 15), label, fill=(255, 255, 255), font=font)
def _rect(draw, bb, scale, H, color, label, width=3, font=None):
"""Draw one bbox (BOTTOM-LEFT origin -> image space) + its label."""
x0, x1 = bb["l"] * scale, bb["r"] * scale
y0, y1 = H - bb["t"] * scale, H - bb["b"] * scale # t is the higher edge -> smaller y_px
draw.rectangle([x0, y0, x1, y1], outline=color, width=width)
if label:
tw = draw.textlength(label, font=font)
draw.rectangle([x0, y0 - 17, x0 + tw + 6, y0], fill=color)
draw.text((x0 + 3, y0 - 16), label, fill=(255, 255, 255), font=font)
def draw_template(draw, tpl, pg, scale, H, W, font):
"""Render the editable template for one page: margins/bands as LINES, footprints as BOXES.
A confirmed element is drawn solid; an unconfirmed (auto) suggestion is drawn dashed."""
MARGIN, MAIN, PART = (0, 150, 136), (25, 118, 210), (211, 47, 47)
page = tpl["pages"].get(str(pg)) or tpl["pages"].get(pg) or {}
# role banner (top-left); margins suppressed entirely on no-margin pages (cover/blank)
role = page.get("role", "question")
draw.rectangle([0, 0, 8 + len(role) * 8, 16], fill=(70, 70, 70))
draw.text((4, 1), f"role:{role}", fill=(255, 255, 255), font=font)
margins_on = page.get("margins_enabled", True)
# margins: axis-locked lines (document scope on every page + this page's page-scope lines)
for m in (tpl.get("margins", []) if margins_on else []):
if m["scope"] == "page" and m.get("page") != pg:
continue
solid = m.get("confirmed")
if m["axis"] == "x":
x = m["value"] * scale
draw.line([x, 0, x, H], fill=MARGIN, width=2) if solid else _dash_v(draw, x, 0, H, MARGIN, 2)
else:
y = H - m["value"] * scale
draw.line([0, y, W, y], fill=MARGIN, width=2) if solid else _dash_h(draw, 0, W, y, MARGIN, 2)
for m in page.get("main_bands", []):
if not m.get("is_start", True): # continuation page: no spurious second "start" line
continue
_hline(draw, m["y_start"], scale, H, W, MAIN, f"Q{m['question']}", 3, font,
dashed=not m.get("confirmed"))
for p in page.get("part_bands", []):
_hline(draw, p["y_start"], scale, H, W, PART, p["label"], 2, font, inset=90,
dashed=not p.get("confirmed"))
for f in page.get("furniture", []):
if f.get("box"):
_rect(draw, f["box"], scale, H, (130, 130, 130), f"furniture:{f.get('kind','')}", 2, font)
for g in page.get("figures", []):
if g.get("box"):
_rect(draw, g["box"], scale, H, (56, 142, 60), "figure", 3, font)
for t in page.get("tables", []):
if t.get("box"):
_rect(draw, t["box"], scale, H, (0, 150, 136),
f"table {t.get('n_rows')}x{t.get('n_cols')}", 3, font)
def render_page(pdf, pg, dpi, td):
"""Render page `pg` and return an image in DOCLING's coordinate space. Docling reports bbox
relative to the CropBox, but pdftoppm renders the MediaBox when CropBox != MediaBox (e.g. the
Edexcel 1MA1 papers: media 652x899, crop inset 28.35pt) that mismatch magnifies + shifts every
overlaid shape toward a corner. Fix: crop the rendered image to the CropBox so it matches Docling.
No-op when CropBox == MediaBox (h556) or when poppler already rendered the CropBox."""
base = os.path.join(td, f"p{pg}")
subprocess.run(["pdftoppm", "-png", "-r", str(dpi), "-f", str(pg), "-l", str(pg), pdf, base],
check=True)
png = next(p for p in (f"{base}-{pg:02d}.png", f"{base}-{pg}.png", f"{base}-{pg:03d}.png")
if os.path.exists(p))
img = Image.open(png).convert("RGB")
try:
import pypdf
page = pypdf.PdfReader(pdf).pages[pg - 1]
mb, cb = page.mediabox, page.cropbox
scale = dpi / 72.0
mbl, mbt = float(mb.left), float(mb.top)
dcrop = any(abs(a - b) > 0.5 for a, b in
((cb.left, mb.left), (cb.bottom, mb.bottom), (cb.right, mb.right), (cb.top, mb.top)))
rendered_mediabox = abs(img.width - (float(mb.right) - mbl) * scale) < 3
if dcrop and rendered_mediabox:
img = img.crop((round((float(cb.left) - mbl) * scale), round((mbt - float(cb.top)) * scale),
round((float(cb.right) - mbl) * scale), round((mbt - float(cb.bottom)) * scale)))
except Exception:
pass
return img
def _dash_v(draw, x, y0, y1, color, w):
y = y0
while y < y1:
draw.line([x, y, x, min(y + 9, y1)], fill=color, width=w); y += 16
def _dash_h(draw, x0, x1, y, color, w):
x = x0
while x < x1:
draw.line([x, y, min(x + 9, x1), y], fill=color, width=w); x += 16
def main():
ap = argparse.ArgumentParser()
ap.add_argument("structured"); ap.add_argument("pdf")
ap.add_argument("--docling", help="raw Docling doc JSON: also draw every body-text block "
"(the content the thin model discards) + derive Granite-table boxes")
ap.add_argument("--bands", help="bands.py JSON: draw main-question + part start/end y-marker lines")
ap.add_argument("--furniture", help="furniture.py JSON: mark recurring furniture vs real figures "
"+ draw the content x-margins")
ap.add_argument("--template", help="template.py JSON: render the editable first-pass template "
"(margins+bands as lines, furniture/figures as boxes). "
"When set, draws ONLY the template (the human-review view).")
ap.add_argument("--pages", help="comma list, e.g. 3,4,5 (default: all pages with geometry)")
ap.add_argument("--dpi", type=int, default=150)
ap.add_argument("--out", default="results/overlay")
a = ap.parse_args()
os.makedirs(a.out, exist_ok=True)
scale = a.dpi / 72.0
font = _font(14)
res = json.load(open(a.structured))
doc_texts = docling_texts_by_page(json.load(open(a.docling))) if a.docling else {}
bands = json.load(open(a.bands))["pages"] if a.bands else {}
furn = json.load(open(a.furniture)) if a.furniture else None
tpl = json.load(open(a.template)) if a.template else None
# gather geometry by page
parts_by_pg, regions_by_pg = {}, {}
for q in res.get("questions", []):
for p in q["parts"]:
if p.get("bbox") and p.get("page"):
parts_by_pg.setdefault(p["page"], []).append((p["label"], p["bbox"]))
for r in res.get("regions", []):
if r.get("bbox") and r.get("page"):
regions_by_pg.setdefault(r["page"], []).append((r["type"], r["bbox"]))
# tables: standard ones carry a bbox; Granite ones don't -> derive from the text layer
tables_by_pg = {}
for t in res.get("tables", []):
pg = t.get("page")
if not pg:
continue
bb = t.get("bbox") or (derive_table_bbox(t.get("grid", []), doc_texts.get(pg, []))
if a.docling else None)
if bb:
tables_by_pg.setdefault(pg, []).append(
(f"table {t.get('source','')} {t.get('n_rows')}x{t.get('n_cols')}", bb))
want = ([int(x) for x in a.pages.split(",")] if a.pages
else (sorted(int(p) for p in tpl["pages"]) if tpl
else sorted(set(parts_by_pg) | set(regions_by_pg) | set(doc_texts))))
if not want:
sys.exit("no bbox geometry in this result (born-digital text path carries no geometry; "
"use an OCR/rapid-path structured.json)")
written = []
with tempfile.TemporaryDirectory() as td:
for pg in want:
img = render_page(a.pdf, pg, a.dpi, td)
H = img.height
draw = ImageDraw.Draw(img)
if tpl: # template-only render = the human-review view
draw_template(draw, tpl, pg, scale, H, img.width, font)
out = os.path.join(a.out, f"p{pg:02d}.png")
img.save(out); written.append(out)
pgd = tpl["pages"].get(str(pg), {})
print(f"p{pg}: template — {len(pgd.get('main_bands',[]))} main, "
f"{len(pgd.get('part_bands',[]))} part, {len(pgd.get('furniture',[]))} furn, "
f"{len(pgd.get('figures',[]))} fig -> {out}")
continue
# layer 0: raw Docling body-text blocks (faint, no label) — the discarded content
for bb, txt, lab in doc_texts.get(pg, []):
_rect(draw, bb, scale, H, BODY_COLOR, None, 1, font)
# layer 1: taxonomy regions
for typ, bb in regions_by_pg.get(pg, []):
_rect(draw, bb, scale, H, REGION_COLORS.get(typ, (120, 120, 120)), typ, 2, font)
# layer 2: tables (Granite-derived boxes in teal)
for lab, bb in tables_by_pg.get(pg, []):
_rect(draw, bb, scale, H, GRANITE_COLOR, lab, 3, font)
# layer 3: part labels on top
for lab, bb in parts_by_pg.get(pg, []):
_rect(draw, bb, scale, H, PART_COLOR, lab, 3, font)
# layer 4: band y-marker lines (main-question = blue, part = red dashed; end = dashed)
pb = bands.get(str(pg)) or bands.get(pg)
nb = 0
if pb:
W = img.width
for m in pb["main"]:
if not m.get("is_start", True): # skip continuation-page duplicate
continue
_hline(draw, m["y_start"], scale, H, W, MAIN_LINE,
f"Q{m['question']} ▸ start", 3, font); nb += 1
_hline(draw, m["y_end"], scale, H, W, MAIN_LINE, None, 1, font, dashed=True)
for p in pb["part"]:
_hline(draw, p["y_start"], scale, H, W, PART_LINE,
f"{p['label']} start", 2, font, inset=90); nb += 1
# layer 5: furniture mask — green=real figure, grey=masked furniture; + content margins
if furn:
W = img.width
for it in furn["items"]:
if it["page"] != pg or it["kind"] != "picture":
continue
if it["furniture"]:
_rect(draw, it["bbox"], scale, H, (130, 130, 130), "furniture", 2, font)
else:
_rect(draw, it["bbox"], scale, H, (56, 142, 60), "figure ✓", 3, font)
band = (furn.get("content_margins") or {}).get("content_x_band")
if band:
for xk in ("x_left", "x_right"):
x = band[xk] * scale
draw.line([x, 0, x, H], fill=(0, 150, 136), width=2)
out = os.path.join(a.out, f"p{pg:02d}.png")
img.save(out); written.append(out)
print(f"p{pg}: {len(parts_by_pg.get(pg,[]))} part-labels, "
f"{len(regions_by_pg.get(pg,[]))} regions, {len(tables_by_pg.get(pg,[]))} tables, "
f"{len(doc_texts.get(pg,[]))} body-text blocks, {nb} band-lines -> {out}")
print(f"-> {len(written)} page(s) in {a.out}/")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,210 @@
#!/usr/bin/env python3
"""
tables.py selective table-cell extraction for the exam extractor (PLAN.md §B).
Two sources, unified into one cell-grid schema:
* STANDARD the Tesseract+TableFormer backbone already emits `tables[].data.table_cells`
(text + row/col offsets + spans + bbox). Free, cached, every run. Good on ruled tables;
but it MISSES some data tables and OCRs them as loose tokens (REPORT.md p5).
* GRANITE Granite-Docling-258M VLM emits `<otsl>` grids in DocTags (clean rows/cols even
where the backbone scrambles them). GPU cost, so used SELECTIVELY: only on pages the router
flags (a standard table present, or dense picture/checkbox), routed through dsync's GPU lock
+ Redis cache. Recipe (REPORT.md): {"to_formats":["doctags","json"], "pipeline":"vlm",
"vlm_pipeline_model":"granite_docling"}.
Unified table = {page, n_rows, n_cols, grid (2D text), cells, caption, source, is_furniture}.
"""
import re, json, os, glob, base64, urllib.request
# ----------------------------------------------------------------- OTSL (Granite DocTags)
OTSL_BLOCK = re.compile(r"<otsl>(.*?)</otsl>", re.S)
CAPTION = re.compile(r"<caption>(?:<loc_\d+>)*(.*?)</caption>", re.S)
CELL_TOK = re.compile(r"<(fcel|ecel|ched|rhed|lcel|ucel|xcel|nl)>([^<]*)")
HEADER_TAGS = {"ched", "rhed"}
def parse_otsl(doctags):
"""Parse every <otsl> block in a DocTags string into unified tables."""
out = []
for block in OTSL_BLOCK.findall(doctags):
cap = None
mc = CAPTION.search(block)
if mc:
cap = re.sub(r"\s+", " ", mc.group(1)).strip()
body = CAPTION.sub("", block)
body = re.sub(r"<loc_\d+>", "", body)
rows, cur = [], []
for tag, txt in CELL_TOK.findall(body):
if tag == "nl":
rows.append(cur); cur = []
else:
cur.append({"text": txt.strip(), "header": tag in HEADER_TAGS,
"empty": tag == "ecel"})
if cur:
rows.append(cur)
rows = [r for r in rows if r]
if not rows:
continue
n_cols = max(len(r) for r in rows)
grid = [[c["text"] for c in r] + [""] * (n_cols - len(r)) for r in rows]
out.append({"page": None, "n_rows": len(rows), "n_cols": n_cols, "grid": grid,
"caption": cap, "source": "granite-otsl",
"is_furniture": is_furniture(grid, cap)})
return out
# ----------------------------------------------------------------- standard TableFormer
def tables_from_standard(doc):
out = []
for t in doc.get("tables", []):
data = t.get("data", {}) or {}
cells = data.get("table_cells", []) or []
nr, nc = data.get("num_rows") or 0, data.get("num_cols") or 0
grid = [["" for _ in range(nc)] for _ in range(nr)]
for c in cells:
r0, c0 = c.get("start_row_offset_idx"), c.get("start_col_offset_idx")
if r0 is not None and c0 is not None and r0 < nr and c0 < nc and c.get("text"):
grid[r0][c0] = c["text"]
prov = t.get("prov") or []
page = prov[0].get("page_no") if prov else None
cap = " ".join(x.get("text", "") for x in (t.get("captions") or []) if isinstance(x, dict)) or None
out.append({"page": page, "n_rows": nr, "n_cols": nc, "grid": grid,
"caption": cap, "source": "docling-standard",
"is_furniture": is_furniture(grid, cap)})
return out
# ----------------------------------------------------------------- furniture filter
FURNITURE_RE = re.compile(r"examiner|do not write|leave\s+blank|question\s*mark|"
r"for marker|total marks?$", re.I)
def is_furniture(grid, caption=None):
"""A table that is exam scaffolding (mark grid / 'For Examiner's Use'), not question data."""
blob = " ".join(cell for row in grid for cell in row) + " " + (caption or "")
if FURNITURE_RE.search(blob):
return True
# a single-column strip of question numbers / blanks = a mark grid
flat = [c for row in grid for c in row if c.strip()]
if flat and all(re.fullmatch(r"\d{1,2}", c.strip()) for c in flat):
return True
return False
# ----------------------------------------------------------------- Granite via dsync
VLM_OPTS = {"to_formats": ["doctags", "json"], "pipeline": "vlm",
"vlm_pipeline_model": "granite_docling", "image_export_mode": "placeholder"}
def _serve_vlm(pdf_b64, fname, page):
import dsync
opts = {**VLM_OPTS, "page_range": [page, page]}
body = {"options": opts,
"sources": [{"kind": "file", "base64_string": pdf_b64, "filename": fname}],
"target": {"kind": "inbody"}}
req = urllib.request.Request(dsync.SERVE + "/v1/convert/source",
data=json.dumps(body).encode(),
headers={"Content-Type": "application/json"})
for _ in range(4): # tolerate the single-use 404 race
try:
return json.loads(urllib.request.urlopen(req, timeout=1200).read())
except urllib.error.HTTPError as e:
if e.code == 404:
import time; time.sleep(3); continue
raise
raise RuntimeError("serve vlm: repeated 404")
def _doctags_of(resp):
doc = resp.get("document") or {}
return doc.get("doctags_content") or doc.get("doc_tags") or doc.get("doctags") or ""
def granite_tables(pdf, pages, *, cached_glob=None, retries=4):
"""Run Granite-Docling on the given pages via dsync (GPU lock + OOM retry + Redis cache),
parse <otsl>, tag each table with its page. Falls back to cached *.doctags if serve fails."""
import dsync, time
cache = _load_cached_doctags(cached_glob) if cached_glob else {}
r = dsync._redis()
b64 = base64.b64encode(open(pdf, "rb").read()).decode()
fname = os.path.basename(pdf)
sha = dsync._sha(pdf)
out = []
for pg in pages:
key = f"docling:vlm:{sha}:p{pg}"
doctags = None
if r and (hit := r.get(key)):
doctags = hit if isinstance(hit, str) else hit.decode()
if doctags is None:
delay = 5
for attempt in range(retries):
with dsync._GpuLock(r):
resp = _serve_vlm(b64, fname, pg)
if dsync._is_oom(resp):
print(f"[granite] p{pg} OOM, backoff {delay}s ({attempt+1}/{retries})")
time.sleep(delay); delay = min(delay * 2, 120); continue
doctags = _doctags_of(resp)
if r and doctags:
r.set(key, doctags, ex=dsync.CACHE_TTL)
break
if not doctags and pg in cache:
print(f"[granite] p{pg} serve empty -> cached doctags")
doctags = cache[pg]
for tbl in parse_otsl(doctags or ""):
tbl["page"] = pg
out.append(tbl)
return out
def _load_cached_doctags(glob_pat):
"""Map page_no -> doctags text from files named *p<N>.doctags."""
cache = {}
for fn in glob.glob(glob_pat):
m = re.search(r"p(\d+)\.doctags$", fn)
if m:
cache[int(m.group(1))] = open(fn, encoding="utf-8", errors="replace").read()
return cache
# ----------------------------------------------------------------- routing + attach
def candidate_pages(doc):
"""Pages the router sends to Granite: a standard table, or a dense picture/checkbox page."""
pages = set()
for t in doc.get("tables", []):
prov = t.get("prov") or []
if prov and prov[0].get("page_no"):
pages.add(prov[0]["page_no"])
chk = {}
for it in doc.get("texts", []):
if it.get("label", "").startswith("checkbox"):
prov = it.get("prov") or []
if prov and prov[0].get("page_no"):
chk[prov[0]["page_no"]] = chk.get(prov[0]["page_no"], 0) + 1
pages |= {p for p, n in chk.items() if n >= 2}
return sorted(pages)
def attach_to_questions(tables, parts):
"""Assign each non-furniture table to the nearest preceding part on its page (by y); if no
geometry, attach to the first part on that page. Records table refs on the part."""
data_tables = [t for t in tables if not t["is_furniture"]]
by_page = {}
for lab, v in parts.items():
by_page.setdefault(v.get("page"), []).append((lab, v))
for i, t in enumerate(data_tables):
t["id"] = i
cands = by_page.get(t["page"], [])
if not cands:
t["for_part"] = None; continue
# best-effort: the part highest on the page (largest bbox top = the page's question stem),
# else the earliest part label. (Tables sit under the stem; we don't carry table y here.)
with_geo = [(lab, v) for lab, v in cands if v.get("bbox")]
if with_geo:
lab = max(with_geo, key=lambda kv: (kv[1]["bbox"] or {}).get("t", 0))[0]
else:
lab = sorted(cands, key=lambda kv: kv[0])[0][0]
t["for_part"] = lab
parts[lab].setdefault("tables", []).append(
{"id": i, "n_rows": t["n_rows"], "n_cols": t["n_cols"],
"caption": t["caption"], "source": t["source"]})
return data_tables

View File

@ -0,0 +1,182 @@
#!/usr/bin/env python3
"""
template.py assemble the editable first-pass structural template from the spike's three signal
sources (extract structured.json + bands.json + furniture.json) into ONE round-trippable JSON the
human reviewer verifies AND edits before stage-2 generates the final template.
UI principle (user, 2026-06-07): directional LIMITS are draggable LINES (1-DOF, easier to drag);
object FOOTPRINTS are BOXES. So:
* margins -> four axis-locked LINES: left/right (x), top/bottom (y)
* question/part bands -> horizontal LINES: start/end y
* furniture / figures / tables -> BOXES (an object's footprint)
Every editable element carries {source: "auto"|"human", confirmed: bool} the AI-suggestion seam.
Stage-2 must consume only confirmed elements (or a template marked confirmed at the top level).
Coordinates are PDF points, BOTTOM-LEFT origin (units in meta); the app maps to its own canvas.
Usage:
python template.py --structured S.json --bands B.json --furniture F.json --pdf P.pdf --out T.json
"""
import json, argparse, datetime
def _line(edge, axis, value, scope, page=None):
o = {"edge": edge, "axis": axis, "value": round(value, 1), "scope": scope,
"source": "auto", "confirmed": False}
if page is not None:
o["page"] = page
return o
def _furn_kind(it):
"""Best-guess label for a furniture box (human can rename). Position-based, BOTTOM-LEFT origin."""
bb = it["bbox"]; cx = (bb["l"] + bb["r"]) / 2; cy = (bb["t"] + bb["b"]) / 2
if it["kind"] == "picture":
if cx > 430 and cy > 700:
return "qr"
if cy < 110:
return "barcode"
return "chrome_picture"
if cy < 90:
return "footer"
if cy > 760:
return "header_or_page_number"
return "chrome_text"
def build(structured, bands, furniture, pdf=None, page_roles=None):
page_roles = page_roles or {}
part_bbox = {p["label"]: p.get("bbox")
for q in structured.get("questions", []) for p in q["parts"]}
cm = furniture.get("content_margins") or {}
xband = cm.get("content_x_band") or {}
per_pg_m = cm.get("per_page") or {}
def margins_on(pg):
r = page_roles.get(str(pg)) or page_roles.get(pg)
return r.get("margins_enabled", True) if r else True
# margins as axis-locked LINES — document-level left/right, per-page top/bottom. Per-page
# top/bottom are omitted for pages with no content column (cover/blank) — the user's override.
margins = []
if "x_left" in xband:
margins.append(_line("left", "x", xband["x_left"], "document"))
margins.append(_line("right", "x", xband["x_right"], "document"))
for pg, m in sorted(per_pg_m.items(), key=lambda kv: int(kv[0])):
if not margins_on(int(pg)):
continue
margins.append(_line("top", "y", m["top"], "page", int(pg)))
margins.append(_line("bottom", "y", m["bottom"], "page", int(pg)))
# furniture + figures as BOXES, grouped by page
furn_pg, fig_pg = {}, {}
for it in furniture.get("items", []):
pg = it["page"]
if it.get("furniture"):
furn_pg.setdefault(pg, []).append(
{"box": it["bbox"], "kind": _furn_kind(it), "docling_label": it["label"],
"source": "auto", "confirmed": False})
elif it["kind"] == "picture":
fig_pg.setdefault(pg, []).append(
{"box": it["bbox"], "source": "auto", "confirmed": False})
tbl_pg = {}
for t in structured.get("tables", []):
if t.get("page"):
tbl_pg.setdefault(t["page"], []).append(
{"box": t.get("bbox"), "n_rows": t.get("n_rows"), "n_cols": t.get("n_cols"),
"table_source": t.get("source"), "source": "auto", "confirmed": False})
# --- reconcile against recovered part labels -------------------------------------------
# A part-label position is never furniture or a figure (the label wins), and a "figure" that
# covers most of the content area is a Docling page-collapse artifact (the GPU sometimes flags
# the whole page as one picture), not a real figure -> drop both. Fixes the Q1.7/Q1.9 clashes
# and the full-page "figure" that was masking part labels.
part_boxes_pg = {}
for q in structured.get("questions", []):
for p in q["parts"]:
if p.get("bbox") and p.get("page"):
part_boxes_pg.setdefault(p["page"], []).append(p["bbox"])
def _inter(a, b):
return not (a["r"] < b["l"] or b["r"] < a["l"] or a["t"] < b["b"] or b["t"] < a["b"])
def _area(b):
return max(0, b["r"] - b["l"]) * max(0, b["t"] - b["b"])
for pg, items in list(furn_pg.items()):
pls = part_boxes_pg.get(pg, [])
furn_pg[pg] = [f for f in items if not (f.get("box") and any(_inter(f["box"], pl) for pl in pls))]
for pg, items in list(fig_pg.items()):
pls = part_boxes_pg.get(pg, [])
m = per_pg_m.get(str(pg)) or per_pg_m.get(pg) or {}
carea = ((m.get("right", 0) - m.get("left", 0)) * (m.get("top", 0) - m.get("bottom", 0))) or (595 * 842)
fig_pg[pg] = [f for f in items if f.get("box")
and _area(f["box"]) <= 0.55 * carea # not a full-page collapse
and not any(_inter(f["box"], pl) for pl in pls)] # not clashing a part label
pages = {}
all_pg = (set(bands["pages"]) | {str(p) for p in furn_pg} | {str(p) for p in fig_pg}
| {str(p) for p in page_roles})
for pgs in sorted(all_pg, key=int):
pg = int(pgs)
pb = bands["pages"].get(pgs) or bands["pages"].get(pg) or {"main": [], "part": []}
main = [{"question": m["question"], "y_start": m["y_start"], "y_end": m["y_end"],
"is_start": m.get("is_start", True),
"source": "auto", "confirmed": False} for m in pb["main"]]
part = [{"label": p["label"], "question": p["question"],
"y_start": p["y_start"], "y_end": p["y_end"],
"label_box": part_bbox.get(p["label"]), # app may render a box instead of lines
"source": "auto", "confirmed": False} for p in pb["part"]]
pr = page_roles.get(pgs) or page_roles.get(pg) or {}
pages[pgs] = {
"role": pr.get("role", "question"),
"role_source": pr.get("source", "default"), "role_confirmed": pr.get("confirmed", False),
"margins_enabled": pr.get("margins_enabled", True), # human-overridable
"main_bands": main, "part_bands": part,
"furniture": furn_pg.get(pg, []), "figures": fig_pg.get(pg, []),
"tables": tbl_pg.get(pg, []),
}
return {
"meta": {
"schema": "exam-template/first-pass/v1",
"board": structured.get("board"), "paper_code": structured.get("paper_code"),
"source_pdf": pdf, "n_pages": furniture.get("n_pages"),
"coord_origin": "BOTTOMLEFT", "units": "pdf_points",
"generated_at": datetime.datetime.now().isoformat(timespec="seconds"),
"ui_principle": "directional limits = draggable axis-locked lines; "
"object footprints = boxes",
"confirmed": False, "confirmed_by": None, "confirmed_at": None,
},
"margins": margins,
"pages": pages,
}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--structured", required=True)
ap.add_argument("--bands", required=True)
ap.add_argument("--furniture", required=True)
ap.add_argument("--page-roles", dest="page_roles", help="page_roles.py JSON (roles + margin override)")
ap.add_argument("--pdf")
ap.add_argument("--out", default="results/template.json")
a = ap.parse_args()
roles = json.load(open(a.page_roles))["pages"] if a.page_roles else {}
t = build(json.load(open(a.structured)), json.load(open(a.bands)),
json.load(open(a.furniture)), a.pdf, roles)
json.dump(t, open(a.out, "w"), indent=2)
np = len(t["pages"])
nm = sum(len(p["main_bands"]) for p in t["pages"].values())
npt = sum(len(p["part_bands"]) for p in t["pages"].values())
nf = sum(len(p["furniture"]) for p in t["pages"].values())
ng = sum(len(p["figures"]) for p in t["pages"].values())
print(f"template {t['meta']['paper_code']} ({t['meta']['board']}): {np} pages, "
f"{len(t['margins'])} margin-lines, {nm} main-bands, {npt} part-bands, "
f"{nf} furniture-boxes, {ng} figure-boxes")
print(f"-> wrote {a.out}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""
validate.py G6 validation/judge: a deterministic consistency pass over an extractor result.
NOT a gate. It never approves or rejects; it attaches confidence + flags so a HUMAN reviewer's
attention is routed to the parts most likely wrong. A clean paper -> all-green, skim; a flagged
paper -> the exact items to check, worst-first. Every value stays a *suggestion* a human confirms.
Checks (all deterministic, no GPU, ~free run on every extraction):
C1 marks-sum vs official max over-read (sum>max) = error; under (sum<max) = warn
C2 part marks plausibility marks None / 0 / implausibly high
C3 top-level question sequence gaps in 1..N (skipped when numbering was OCR-inferred '~')
C4 sub-part contiguity within a question: a,b,c / .1,.2,.3 with no hole
C5 coverage missed parts vs ground truth (when the result carries it)
Usage:
python validate.py results/genreport/edexcel1f/ocr_struct_filled.json
python validate.py <structured.json> --out report.json
"""
import json, re, sys, argparse
from collections import defaultdict
IMPLAUSIBLE_PART_MARKS = 15 # a single sub-part above this is worth a human glance
def _qnum(q):
"""Numeric value of a top-level question id ('01'->1, '4'->4); None if inferred ('~3') / odd."""
if q.startswith("~"):
return None
m = re.match(r"^0*(\d+)$", q)
return int(m.group(1)) if m else None
def _subkey(label, q):
"""The part's own suffix within its question: '01.2'->'2', '4a'->'a', '1bi'->'bi'."""
s = label[len(q):] if label.startswith(q) else label
return s.lstrip(".").lstrip("~")
def validate(result):
board = result.get("board")
code = result.get("paper_code")
flags, checks = [], []
parts = [(p["label"], q["question"], p) for q in result.get("questions", []) for p in q["parts"]]
conf = {} # label -> high/medium/low
low = set() # labels a check has implicated
def add(cid, severity, status, detail):
checks.append({"id": cid, "severity": severity, "status": status, "detail": detail})
if status != "ok":
flags.append(f"[{severity}] {cid}: {detail}")
# ---- C1: marks sum vs official maximum -------------------------------------------------
mc = result.get("stats", {}).get("marks_check")
exp = (mc or {}).get("expected_max") or result.get("front_matter", {}).get("max_marks")
msum = (mc or {}).get("sum")
if msum is None:
msum = sum(p["marks"] for *_, p in parts if p.get("marks") is not None)
if exp:
if msum > exp:
add("C1_marks_sum", "error", "over",
f"marks sum {msum} EXCEEDS official max {exp} (+{msum-exp}) — an over-read; check the paper")
elif msum < exp:
add("C1_marks_sum", "warn", "under",
f"marks sum {msum} below official max {exp} (-{exp-msum}) — missing parts or unread marks")
else:
add("C1_marks_sum", "info", "ok", f"marks sum {msum} == official max {exp}")
else:
add("C1_marks_sum", "info", "unknown", "no official max available to check the sum against")
# ---- C2: per-part marks plausibility ---------------------------------------------------
none_ct = zero_ct = 0
for lab, q, p in parts:
mk = p.get("marks")
if mk is None:
none_ct += 1; low.add(lab)
elif mk == 0:
zero_ct += 1; low.add(lab)
elif mk > IMPLAUSIBLE_PART_MARKS:
low.add(lab)
add("C2_part_marks", "warn", "implausible",
f"part {lab} has {mk} marks (> {IMPLAUSIBLE_PART_MARKS}) — verify it isn't a mis-read")
if none_ct or zero_ct:
add("C2_part_marks", "warn", "missing",
f"{none_ct} part(s) with no mark, {zero_ct} with 0 marks — unread/garbled mark tokens")
elif not any(c["id"] == "C2_part_marks" for c in checks):
add("C2_part_marks", "info", "ok", "every part carries a plausible mark")
# ---- C3: top-level question sequence + EXPECTED-question interpolation ------------------
# If Q1, Q2 ... Q14 are recovered but 3-13 are not, the paper certainly HAS 3-13 — they were
# just missed (e.g. a Docling page-collapse). We emit the full expected sequence with a per-Q
# `recovered` flag so a live question-tree view can render the gaps as explicit "needs a second
# pass" slots, and a targeted re-OCR knows exactly which questions to chase.
qids = [q for q in dict.fromkeys(q for _, q, _ in parts)]
nums = sorted({n for n in (_qnum(q) for q in qids) if n is not None})
zero_pad = any(len(q) == 2 and q.startswith("0") for q in qids) # AQA 'NN' vs Edexcel/OCR 'N'
question_sequence = []
if any(q.startswith("~") for q in qids):
add("C3_question_seq", "info", "inferred",
"question numbers were OCR-inferred ('~N') — sequence not checkable; treat labels as approximate")
elif nums:
# isolated high outliers (a content number mis-read as 'Q67' after Q1-10) are likely
# spurious top-levels, not 50 missing questions — strip them off the top so the sequence
# reflects the real paper, and flag them for review instead of flooding the tree with slots.
core, suspect = nums[:], []
while len(core) >= 2 and core[-1] - core[-2] > 4:
suspect.insert(0, core.pop())
hi = core[-1] if core else nums[-1]
gaps = [n for n in range(nums[0], hi + 1) if n not in core]
question_sequence = [{"n": n, "label": (f"{n:02d}" if zero_pad else str(n)),
"recovered": n in core} for n in range(nums[0], hi + 1)]
if suspect:
add("C3_question_seq", "warn", "spurious",
f"isolated high question number(s) {suspect} after a {nums[0]}-{hi} run — likely a "
f"content number mis-read as a top-level question; review/remove")
if gaps:
add("C3_question_seq", "warn", "gap",
f"top-level questions {gaps} missing between {nums[0]}-{hi} — expected but "
f"unrecovered; surface as second-pass slots in the question tree")
elif not suspect:
add("C3_question_seq", "info", "ok", f"questions {nums[0]}-{hi} contiguous")
# ---- C4: sub-part contiguity within each question --------------------------------------
def order(keys):
"""Map a question's child keys to an ordered scheme + report holes. Handles .N and a/b/c."""
dig = sorted(int(k[0]) for k in keys if k[:1].isdigit())
let = sorted(k[0] for k in keys if k[:1].isalpha())
holes = []
if dig:
holes += [str(n) for n in range(dig[0], dig[-1] + 1) if n not in dig]
if let:
lo, hi = ord(let[0]), ord(let[-1])
holes += [chr(c) for c in range(lo, hi + 1) if chr(c) not in let]
return holes
byq = defaultdict(list)
for lab, q, p in parts:
sk = _subkey(lab, q)
if sk:
byq[q].append(sk)
seq_holes = {}
for q, keys in byq.items():
firsts = {k[0] for k in keys} # immediate children only (a / 1 / etc.)
h = order(firsts)
if h:
seq_holes[q] = h
if seq_holes:
add("C4_subpart_seq", "warn", "gap",
"sub-part gaps: " + ", ".join(f"Q{q} missing {hs}" for q, hs in sorted(seq_holes.items())))
else:
add("C4_subpart_seq", "info", "ok", "sub-parts contiguous within every question")
# ---- C5: coverage vs ground truth (when present) ---------------------------------------
cov = result.get("coverage", {})
if cov.get("coverage_pct") is not None:
missed = cov.get("missed", [])
if missed:
add("C5_coverage", "warn", "missed",
f"{cov['coverage_pct']}% vs GT ({cov['recovered']}/{cov['total']}); missed {missed[:10]}")
low.update(missed)
else:
add("C5_coverage", "info", "ok", f"100% coverage vs GT ({cov['recovered']}/{cov['total']})")
# ---- per-part confidence + paper summary -----------------------------------------------
sum_mismatch = any(c["id"] == "C1_marks_sum" and c["status"] in ("over", "under") for c in checks)
for lab, q, p in parts:
if lab in low:
conf[lab] = "low"
elif sum_mismatch:
conf[lab] = "medium" # paper-level doubt taints every part a little
else:
conf[lab] = "high"
severities = [c["severity"] for c in checks if c["status"] not in ("ok", "info", "unknown")]
worst = "error" if "error" in severities else "warn" if "warn" in severities else "clean"
return {
"paper_code": code, "board": board,
"summary": {
"worst_severity": worst,
"needs_priority_review": worst != "clean",
"n_flags": len(flags),
"marks_sum": msum, "official_max": exp,
"parts_total": len(parts),
"parts_low_conf": sum(1 for v in conf.values() if v == "low"),
"questions_expected": len(question_sequence) or None,
"questions_recovered": sum(1 for q in question_sequence if q["recovered"]) or None,
},
"flags": flags,
"checks": checks,
"part_confidence": conf,
"question_sequence": question_sequence, # full expected skeleton (recovered + missing slots)
}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("structured")
ap.add_argument("--out")
a = ap.parse_args()
rep = validate(json.load(open(a.structured)))
s = rep["summary"]
print(f"paper : {rep['paper_code']} ({rep['board']})")
print(f"verdict : {s['worst_severity'].upper()} "
f"{'-> PRIORITY REVIEW' if s['needs_priority_review'] else '-> all checks clean (still human-reviewable)'}")
print(f"marks : {s['marks_sum']}/{s['official_max']} | parts {s['parts_total']} "
f"({s['parts_low_conf']} low-confidence)")
if s.get("questions_expected"):
miss = [q["label"] for q in rep["question_sequence"] if not q["recovered"]]
print(f"questions : {s['questions_recovered']}/{s['questions_expected']} recovered"
+ (f" | second-pass slots: {miss}" if miss else " (complete sequence)"))
if rep["flags"]:
print("flags:")
for f in rep["flags"]:
print(f" - {f}")
else:
print("flags : none")
if a.out:
json.dump(rep, open(a.out, "w"), indent=2)
print(f"-> wrote {a.out}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,51 @@
import json
import os
from pathlib import Path
import pytest
from api.services.docling import FIRST_PASS_SCHEMA, auto_map
SPIKE_ROOT = Path(os.environ.get("DOCLING_SPIKE_ROOT", "/home/kcar/dev/docling-exam-spike"))
PHYSICS_PDF = SPIKE_ROOT / "samples" / "AQA-Physics-Paper-1H-2022-with-qr.pdf"
PHYSICS_TEMPLATE = SPIKE_ROOT / "results" / "template" / "physics.json"
BORN_DIGITAL_PDF = SPIKE_ROOT / "samples" / "physics-p1h-2022-qp.pdf"
@pytest.mark.skipif(not (PHYSICS_PDF.exists() and PHYSICS_TEMPLATE.exists()), reason="spike corpus not present")
def test_auto_map_matches_spike_physics_template_shape():
expected = json.loads(PHYSICS_TEMPLATE.read_text())
result = auto_map(PHYSICS_PDF.read_bytes(), spike_root=SPIKE_ROOT)
assert result["meta"]["schema"] == FIRST_PASS_SCHEMA
assert result["meta"]["schema"] == expected["meta"]["schema"]
assert set(result.keys()) == set(expected.keys())
assert result["meta"]["board"] == expected["meta"]["board"]
assert result["meta"]["paper_code"] == expected["meta"]["paper_code"]
assert len(result["margins"]) == len(expected["margins"])
assert set(result["pages"].keys()) == set(expected["pages"].keys())
assert result["pages"]["2"]["role"] == expected["pages"]["2"]["role"]
assert result["pages"]["2"]["part_bands"][0].keys() == expected["pages"]["2"]["part_bands"][0].keys()
@pytest.mark.skipif(not BORN_DIGITAL_PDF.exists(), reason="born-digital spike PDF not present")
def test_auto_map_fast_path_without_cache_produces_first_pass_template():
result = auto_map(
BORN_DIGITAL_PDF.read_bytes(),
source_pdf="samples/physics-p1h-2022-qp.pdf",
spike_root=SPIKE_ROOT,
prefer_cache=False,
)
assert result["meta"]["schema"] == FIRST_PASS_SCHEMA
assert result["meta"]["board"] == "aqa"
assert result["meta"]["paper_code"] == "8463/1"
assert result["meta"]["source_pdf"] == "samples/physics-p1h-2022-qp.pdf"
assert result["margins"]
assert result["pages"]
def test_auto_map_rejects_empty_pdf_bytes():
with pytest.raises(ValueError):
auto_map(b"")