api/api/services/docling/extract.py
kcar 76e11b0b06
Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled
feat(docling): B1-2 AQA label normalization + missing-.1 inference + MCQ gap (salvaged)
(cherry picked from commit a707a5afd92c5c9fb042486229d0ef11549a3f53)
2026-06-08 04:03:17 +00:00

1006 lines
49 KiB
Python
Executable File

#!/usr/bin/env python3
"""
extract.py v2 — board-aware structured extraction of UK exam papers.
v1 (see extract_v1_backup.py) proved a thin custom layer over Docling output recovers the
exam-marker taxonomy at 95% on the image-only AQA Physics paper, but was AQA-only and read
question labels from a RapidOCR per-page pass. v2 generalises across exam boards while
*preserving* that proven AQA path:
* BOARD DETECTION <- paper code in the front matter (8463/7408/8461 -> AQA, 1MA1 -> Edexcel,
H556 -> OCR). Each board uses a different numbering + marks grammar (PLAN.md D1).
* AQA <- when Docling JSON + RapidOCR dumps are available, use v1's boxed-label
recovery (the 95% path). Otherwise fall back to the AQA text grammar.
* EDEXCEL <- top-level integers anchored on "Total for Question N is M marks" (the
precise signal that kills false-positive content numbers like 0/24/62), parts (a)(b)(c),
per-part marks (N).
* OCR <- sequential top-level integers followed by question text, parts (a)/(i),
marks [N]; `(b)*` flags an extended-response part.
* REGIONS <- Docling layout labels mapped to taxonomy + gemma4:e4b `answer_regions`
(taxonomy #3 — the one structure no deterministic pass emits) merged by part.
* TABLES <- Docling `tables` carried through; parts on a table page flagged has_table.
* COVERAGE <- recall vs a ground-truth label set: built-in physics GT (regression guard)
or the born-digital GT text parsed with the same board grammar.
The extractor works off a unified line stream so the same grammars serve both the OCR path
(Docling JSON, has geometry) and the born-digital fast-path (pdftotext, no GPU).
Usage:
python extract.py # AQA physics, v1 path -> 95% (regression guard)
python extract.py --text results/gt_extra/edexcel-gcse-maths-1ma1-1h-jun22-qp.txt
python extract.py --text PAPER.pdf --gemma results/gemma_sweep_physics_200 --out out.json
python extract.py --ocr samples/extra/ocr-...-qp.pdf # live OCR via dsync (uses shared GPU)
python extract.py --auto PAPER.pdf # detect text layer -> fast-path, else
# report the OCR path is required
"""
import json, re, glob, argparse, subprocess, os
from collections import defaultdict, namedtuple
import xml.etree.ElementTree as ET
try:
from . import tables as tbl_mod
except ImportError: # pragma: no cover - CLI execution
import tables as tbl_mod
try:
from . import regions as region_mod
except ImportError: # pragma: no cover - CLI execution
import regions as region_mod
# ----------------------------------------------------------------- line model
Line = namedtuple("Line", "text page bbox") # bbox is None for text-only sources
def _union_bbox(boxes):
return {"l": min(b["l"] for b in boxes), "r": max(b["r"] for b in boxes),
"t": max(b["t"] for b in boxes), "b": min(b["b"] for b in boxes)}
def _bbox_lines_from_pdftotext(path):
"""Return (lines, pages) from `pdftotext -bbox`.
Poppler emits XHTML word boxes with a TOP-LEFT y-axis. The spike/app contract uses Docling-style
PDF points with a BOTTOM-LEFT origin, so convert every word/line box via page height:
l=xMin, r=xMax, t=page_height-yMin, b=page_height-yMax.
The text grammar still consumes line strings; grouping words on the same y band preserves enough
spacing for board grammars while adding geometry to the born-digital fast path.
"""
raw = subprocess.check_output(["pdftotext", "-bbox", path, "-"]).decode("utf-8", "replace")
root = ET.fromstring(raw)
ns = {"x": "http://www.w3.org/1999/xhtml"}
out, pages = [], []
for pg, page in enumerate(root.findall(".//x:page", ns), 1):
width = float(page.get("width") or 0)
height = float(page.get("height") or 0)
pages.append({"page": pg, "width": width, "height": height,
"bbox": {"l": 0.0, "r": width, "t": height, "b": 0.0}})
words = []
for w in page.findall("x:word", ns):
txt = (w.text or "").strip()
if not txt:
continue
x0, y0 = float(w.get("xMin") or 0), float(w.get("yMin") or 0)
x1, y1 = float(w.get("xMax") or 0), float(w.get("yMax") or 0)
bb = {"l": x0, "r": x1, "t": height - y0, "b": height - y1}
words.append((y0, x0, txt, bb))
words.sort()
groups = []
for y0, x0, txt, bb in words:
# Same baseline/text row. 3pt handles minor glyph-height jitter without merging rows.
if not groups or abs(groups[-1]["y0"] - y0) > 3.0:
groups.append({"y0": y0, "words": []})
groups[-1]["words"].append((x0, txt, bb))
for g in groups:
g["words"].sort(key=lambda x: x[0])
text = " ".join(txt for _, txt, _ in g["words"])
out.append(Line(text, pg, _union_bbox([bb for _, _, bb in g["words"]])))
return out, pages
def lines_from_pdftext(path):
"""Born-digital fast-path / GT source: pdftotext, with word/line bbox for PDFs."""
if path.endswith(".pdf"):
return _bbox_lines_from_pdftotext(path)[0]
raw = open(path, encoding="utf-8", errors="replace").read()
out = []
for pg, page in enumerate(raw.split("\f"), 1):
for ln in page.splitlines():
if ln.strip():
out.append(Line(ln, pg, None))
return out
def pages_from_pdftext(path):
if path and path.endswith(".pdf"):
return _bbox_lines_from_pdftotext(path)[1]
return []
def _prefix_bbox(line, width=52):
"""Approximate the leading label box within a pdftotext-bbox line.
The fast-path line bbox spans the full text row (label + question prose). For template/overlay use,
part geometry should mark the label at the row start, not the whole row. Poppler word geometry is
currently collapsed to line boxes, so keep the row's vertical extent and cap the horizontal extent
to the left prefix where exam-board labels live.
"""
if not line.bbox:
return None
return {"l": line.bbox["l"], "r": min(line.bbox["r"], line.bbox["l"] + width),
"t": line.bbox["t"], "b": line.bbox["b"]}
# ----------------------------------------------------------------- text-layer auto-detect
# Every UK exam-board QP PDF ships a text layer (born-digital), making pdftotext the common-case
# production path: no GPU, no OCR. The only exception in practice is a *redistribution* that has
# been re-rendered to images (e.g. samples/AQA-Physics-Paper-1H-2022-with-qr.pdf), which carries
# NO text layer and must go through the OCR pipeline. We pick the path automatically by measuring
# how much real text pdftotext recovers, normalised per page.
#
# Calibration (measured on the corpus, non-whitespace chars / page from `pdftotext -layout`):
# image-only AQA-Physics-...-with-qr.pdf ..... 0 -> OCR path
# edexcel 1MA1/1H (sparsest born-digital) .... ~326
# every other born-digital QP ................ 400-1200
# A born-digital QP yields hundreds of chars/page; an image-only PDF yields ~0 (a stray QR/footer
# might leak a handful). 40 chars/page sits an order of magnitude below the sparsest real paper
# and well above any image-only leakage, so it cleanly separates the two with wide margin.
TEXT_LAYER_MIN_CHARS_PER_PAGE = 40
def text_layer_chars_per_page(path):
"""Return (total_non_space_chars, n_pages, chars_per_page) for a PDF's pdftotext output.
chars_per_page is the auto-detect signal: it normalises out paper length so a long sparse
paper isn't mistaken for image-only and a short dense one isn't over-counted."""
raw = subprocess.check_output(["pdftotext", "-layout", path, "-"]).decode("utf-8", "replace")
chars = sum(1 for c in raw if not c.isspace())
n_pages = raw.count("\f") + 1 # pdftotext emits a form-feed after each page
return chars, n_pages, (chars / n_pages if n_pages else 0)
def has_text_layer(path):
"""True if `path` is a born-digital PDF (substantive text layer) suitable for the fast-path.
A PDF re-rendered to images (the scanned/image-only redistribution case) returns False and
must be routed to the OCR pipeline (--ocr / the AQA RapidOCR margin-pass)."""
_, _, cpp = text_layer_chars_per_page(path)
return cpp >= TEXT_LAYER_MIN_CHARS_PER_PAGE
def lines_from_docling(doc):
"""OCR path: one line per Docling text item, in reading order, carrying page + bbox."""
items = []
for t in doc.get("texts", []):
prov = t.get("prov") or []
if not prov:
items.append(Line(t.get("text") or "", None, None)); continue
page, bb = prov[0].get("page_no"), prov[0].get("bbox")
items.append(Line(t.get("text") or "", page, bb))
# reading order: page, then top-down (Docling bbox origin is bottom-left -> larger t = higher)
items.sort(key=lambda l: (l.page or 0, -((l.bbox or {}).get("t", 0)), (l.bbox or {}).get("l", 0)))
return items
# ----------------------------------------------------------------- board detection
PAPER_CODE_RES = [
("aqa", re.compile(r"\b(7408|8463|8461|8464|8462|8702|8700|7405|7402)/\d")),
("edexcel", re.compile(r"\b1MA1/\d", re.I)),
("ocr", re.compile(r"\bH\d{3}/?\d?\b")),
]
WORDMARK_RES = [
("edexcel", re.compile(r"Pearson|Edexcel", re.I)),
("ocr", re.compile(r"Oxford Cambridge and RSA|\bOCR\b")),
("aqa", re.compile(r"\bAQA\b")),
]
# structural grammar signals — the board-specific tokens themselves. These survive OCR far better
# than cover-page branding / paper codes (which mangle: "1MA1/1F" -> "IMA1 1", "Pearson Edexcel"
# split across lines), so they're the robust fallback before wordmarks.
EDX_SIG = re.compile(r"Total for Question\s+\d+\s+is\s+\d+\s+marks?", re.I)
OCR_SIG = re.compile(r"Oxford Cambridge and RSA", re.I)
AQA_SIG = re.compile(r"\[\s*\d+\s*marks?\s*\]") # [N marks] — AQA, not OCR's bare [N]
def detect_board(lines):
"""Return (board, paper_code|None). Order: paper code (authoritative) -> structural grammar
signal (OCR-robust) -> wordmark -> default."""
blob = "\n".join(l.text for l in lines[:1500]) # whole front + body, not just cover
for board, rx in PAPER_CODE_RES:
m = rx.search(blob)
if m:
return board, m.group(0)
if EDX_SIG.search(blob):
return "edexcel", None
if OCR_SIG.search(blob):
return "ocr", None
if len(AQA_SIG.findall(blob)) >= 3:
return "aqa", None
for board, rx in WORDMARK_RES:
if rx.search(blob):
return board, None
return "aqa", None # safe default
# ----------------------------------------------------------------- front matter
def extract_front_matter(lines, board, code):
blob = "\n".join(l.text for l in lines[:400])
fm = {"exam_board": {"aqa": "AQA", "edexcel": "Pearson Edexcel", "ocr": "OCR"}[board]}
if code:
fm["paper_code"] = code
m = re.search(r"\b(GCSE|GCE|A-?level|AS)\b\s+([A-Z][A-Za-z ]+)", blob)
if m:
fm["qualification"] = m.group(1).upper().replace("-", "")
fm["subject"] = m.group(2).split("\n")[0].strip().title()
m = re.search(r"(Higher|Foundation)\s+Tier", blob, re.I)
if m:
fm["tier"] = m.group(1).title()
m = re.search(r"Time\s+allowed[:\s]+([0-9].*?(?:hour|minute)s?[^\n]*)", blob, re.I)
if m:
fm["time_allowed"] = m.group(1).strip()
# authoritative paper-total phrasings first, then the generic fallback
m = (re.search(r"TOTAL FOR PAPER IS\s+(\d{2,3})\s+MARKS", blob, re.I)
or re.search(r"total mark for this paper is\s+(\d{2,3})", blob, re.I)
or re.search(r"(?:maximum mark|maximum mark for this paper)\D{0,8}(\d{2,3})", blob, re.I))
if m:
fm["max_marks"] = int(m.group(1))
m = re.search(r"(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\w*\s*\.?\s*(\d{2,4})", blob)
if m:
fm["session"] = f"{m.group(1)} 20{m.group(2)[-2:]}"
return fm
# ====================================================================== AQA
# --- v1 path: Docling JSON + RapidOCR boxed labels (the proven 95% recovery) -----
PART_RE = re.compile(r"^(\d{2})\.(\d)$") # 01.2
# OCR sometimes inserts bracket/noise glyphs inside boxed labels (e.g. "02].3").
# Normalise only tight margin-column candidates before matching; body decimals
# remain protected by the label-column gate below.
AQA_LABEL_NOISE = re.compile(r"[^0-9.]+")
AQA_CIRCLED_DIGITS = str.maketrans({"": "1", "": "2", "": "3", "": "4", "": "5", "": "6", "": "7", "": "8", "": "9"})
NUM_RE = re.compile(r"^(\d{2})$") # 08
DIG_RE = re.compile(r"^(\d)$") # 4
# A-level papers (7408) render the boxed label GLUED to the question text in one OCR token
# ("01.1 An atom of ...") rather than as a standalone box (GCSE 8463). Recover it as a prefix,
# but only inside the tight left label column — body text (incl. decimals like "10.5 N") sits
# at l>=92, so this column gate is the precision filter that keeps false positives out.
# real part decimals run .1-.9 (never .0), so [1-9] rejects decimal content like "10.0" that
# happens to land in the label column; ".0" is reserved for our MCQ top-level placeholders.
PART_PREFIX = re.compile(r"^\s*(\d)\s*(\d)\s*\.\s*([1-9])(?=\s|$)") # "01.1 ..." / "0 1 . 1 ..."
LABEL_COL_MAX = 75 # left edge of the label box
MIN_MCQ_RUN = 5 # a real Section-B MCQ run is long+contiguous; fewer = stray page numbers
FOOTER_T = 60 # bbox bottom-left origin: t<~30 is the page-number footer, not content
# A-level Section B is multiple-choice: bare sequential top-level numbers ("07 Which two...",
# or a lone "07") with no decimal part. They render glued in the label column. The sequence
# gate (each accepted number == previous + 1) is the precision filter that rejects OCR noise
# (misread "60", "10 6") — exactly as the OCR-board grammar gates bare integers.
MCQ_TOP = re.compile(r"^(\d{2})(?:\s+[A-Z(].*|\s*)$")
def _rapid_pages(rapid_glob):
"""Yield (page_no, doc) in NUMERIC page order (glob sorts lexically: p1,p10,p2...)."""
files = sorted(glob.glob(rapid_glob),
key=lambda f: int(re.search(r"p(\d+)\.json", f).group(1)))
for fn in files:
pg = int(re.search(r"p(\d+)\.json", fn).group(1))
yield pg, json.load(open(fn))
def _clean_aqa_label(raw):
compact = (raw or "").strip().translate(AQA_CIRCLED_DIGITS).replace(" ", "")
# Do not turn prose/option OCR artifacts like "36.7Q" into labels; PART_PREFIX handles
# genuine glued label+prose cases from the raw text under the label-column gate.
if re.search(r"[A-Za-z]", compact):
return compact
return AQA_LABEL_NOISE.sub("", compact)
def _synthetic_label_bbox(page_lines, fallback):
"""Best-effort bbox for an OCR-missed AQA label, preserving the layout contract."""
body = [bb for bb, _ in page_lines if 90 <= bb.get("l", 999) <= 170 and bb.get("t", 0) >= FOOTER_T]
if body:
top = max(body, key=lambda b: b.get("t", 0))
return {"l": 48.0, "r": 104.0, "t": round(top["t"], 1), "b": round(top["b"], 1),
"coord_origin": top.get("coord_origin", "BOTTOMLEFT")}
if fallback:
return dict(fallback)
return {"l": 48.0, "r": 104.0, "t": 780.0, "b": 760.0, "coord_origin": "BOTTOMLEFT"}
def aqa_questions_rapid(rapid_glob):
"""Recover question labels from per-page RapidOCR dumps. Handles three AQA layouts:
* GCSE standalone label/number boxes (8463 — v1's NN.M + NUM/DIG pairing),
* A-level structured parts glued as a prefix ("01.1 An atom of..." — label column),
* A-level Section-B multiple choice: bare sequential top-levels -> NN.0."""
parts = {}
page_lines = defaultdict(list) # page -> [(bbox, raw)] for deterministic inference
mcq_cands = [] # (page, NN, bbox) bare top-level candidates, in order
top_cands = {} # NN -> (page, bbox) explicit top-level question headers
for pg, d in _rapid_pages(rapid_glob):
margin = []
for t in d.get("texts", []):
raw = (t.get("text") or "").strip()
s = _clean_aqa_label(raw)
prov = t.get("prov") or []
bb = prov[0].get("bbox") if prov else None
if bb is None:
continue
page_lines[pg].append((bb, raw))
if bb["l"] > 140:
continue
margin.append((bb, s))
m = PART_RE.match(s)
if m and m.group(2) != "0":
parts.setdefault(f"{m.group(1)}.{m.group(2)}", {"page": pg, "bbox": bb})
elif bb["l"] <= LABEL_COL_MAX:
mp = PART_PREFIX.match(raw)
if mp:
parts.setdefault(f"{mp.group(1)}{mp.group(2)}.{mp.group(3)}",
{"page": pg, "bbox": bb})
elif bb["t"] >= FOOTER_T: # skip page-number footers (page N -> "N")
mc = MCQ_TOP.match(raw)
if mc:
mcq_cands.append((pg, mc.group(1), bb))
nums = [(bb, NUM_RE.match(s).group(1)) for bb, s in margin if NUM_RE.match(s)]
digs = [(bb, DIG_RE.match(s).group(1)) for bb, s in margin if DIG_RE.match(s)]
for nbb, nn in nums:
top_cands.setdefault(nn, (pg, nbb))
ny = (nbb["t"] + nbb["b"]) / 2
for dbb, dd in digs:
dy = (dbb["t"] + dbb["b"]) / 2
if abs(ny - dy) < 12 and dbb["l"] >= nbb["l"]:
parts.setdefault(f"{nn}.{dd}", {"page": pg, "bbox": nbb})
# Before Section-B handling, trim isolated high structured labels when a real MCQ run starts
# immediately after the core structured section. This prevents OCR option text such as "36.7Q"
# from moving the MCQ start from Q07 to Q37.
q_nums = sorted({int(lab.split(".")[0]) for lab in parts if not lab.endswith(".0")})
core_q = q_nums[:]
while len(core_q) >= 2 and core_q[-1] - core_q[-2] > 2:
core_q.pop()
mcq_nums = {int(nn) for _, nn, _ in mcq_cands}
if core_q and any(max(core_q) < n <= max(core_q) + 3 for n in mcq_nums):
core_set = set(core_q)
for lab in list(parts):
if int(lab.split(".")[0]) not in core_set and not lab.endswith(".0"):
parts.pop(lab, None)
# Infer an OCR-dropped leading .1 part when later structured parts for the same question are
# present. AQA papers overwhelmingly start structured questions at .1; this fixes pages where
# RapidOCR reads the prose but misses the small boxed label, without changing schemas or model paths.
by_q = defaultdict(list)
for lab, v in parts.items():
q, sub = lab.split(".")
if sub != "0":
by_q[q].append((int(sub), v))
for q, vals in list(by_q.items()):
if f"{q}.1" not in parts:
first_sub, first_v = min(vals, key=lambda x: (x[0], x[1].get("page") or 999))
if first_sub > 1 and first_v.get("page"):
pg = int(first_v["page"])
parts[f"{q}.1"] = {"page": pg, "bbox": _synthetic_label_bbox(page_lines.get(pg, []), first_v.get("bbox"))}
subs = sorted(int(sub) for lab in parts for qq, sub in [lab.split(".")] if qq == q and sub != "0")
# Fill only one-step internal OCR gaps with support on both sides; do not expand a lone
# false high subpart into a whole run of synthetic labels.
if len(subs) >= 3:
for prev_sub, next_sub in zip(subs, subs[1:]):
if next_sub - prev_sub == 2:
missing = prev_sub + 1
anchor = parts[f"{q}.{next_sub}"]
parts[f"{q}.{missing}"] = {"page": anchor.get("page"), "bbox": dict(anchor.get("bbox") or {})}
# Preserve explicit one-part structured questions seen as a bare top-level header (for example
# GCSE Combined Chemistry 03 with no decimal sub-label) without converting ordinary question
# headers that already have .1/.2 children into extra .0 parts.
present_q = {lab.split(".")[0] for lab in parts}
for q, (pg, bb) in top_cands.items():
if q not in present_q:
parts.setdefault(f"{q}.0", {"page": pg, "bbox": bb})
# Section B: walk MCQ candidates in reading order, accepting a tight increasing sequence.
structured_q = set(core_q or [int(lab.split(".")[0]) for lab in parts])
expect = (max(structured_q) + 1) if structured_q else 1
mcq_cands.sort(key=lambda c: (c[0], -(c[2] or {}).get("t", 0))) # page, then top-down
cand = {} # nn -> (page, bbox), first occurrence in reading order
for pg, nn, bb in mcq_cands:
cand.setdefault(int(nn), (pg, bb))
# Take exact expected numbers; for tiny OCR gaps before the next real MCQ candidate, emit
# deterministic placeholders so a single garbled number does not end Section B recovery.
seq = []
while True:
if expect in cand and expect not in structured_q:
seq.append((expect, cand[expect]))
expect += 1
continue
nxt = [n for n in cand if expect < n <= expect + 3 and n not in structured_q]
if nxt:
jump_to = min(nxt)
for missing in range(expect, jump_to):
seq.append((missing, cand[jump_to]))
expect = jump_to
continue
break
# Only commit if this is a real Section-B MCQ run, not a few stray page/figure numbers on a
# paper with no MCQ section (a GCSE structured paper yields a short, gappy set; a real MCQ
# section is a long contiguous run).
if len(seq) >= MIN_MCQ_RUN:
for n, (pg, bb) in seq:
parts.setdefault(f"{n:02d}.0", {"page": pg, "bbox": bb})
# In the rapid path every ".0" label is a Section-B multiple-choice question, worth 1 mark
# each (they carry no "[N marks]" token for geometry to bind). Structured parts stay None
# until attach_marks_by_geometry fills them from the marks list.
return {lab: {"q": lab.split(".")[0], "page": v["page"], "bbox": v["bbox"],
"marks": (1 if lab.endswith(".0") else None), "regions": []}
for lab, v in parts.items()}
# --- AQA text grammar (born-digital AQA papers, e.g. 7408 with a text layer) ------
AQA_MARK = re.compile(r"\[\s*(\d+)\s*marks?\s*\]", re.I)
# AQA boxed labels render SPACE-SEPARATED in pdftotext ("0 1 . 1"); decimal content
# ("10.5 N") does not. `pdftotext -bbox` normalises gaps to single spaces, while `-layout`
# preserved wider runs, so the top-box grammar tolerates either one-or-more spaces.
AQA_PART_BOX = re.compile(r"^\s*(\d)\s+(\d)\s*\.\s*(\d)(?=\s|$)") # 0 1 . 1
AQA_TOP_BOX = re.compile(r"^\s*(\d)\s+(\d)\s+(?=[A-Z(])") # 0 2 Carbon...
def aqa_questions_text(lines):
parts = {}
cur = None
for l in lines:
mp = AQA_PART_BOX.match(l.text)
if mp:
q = f"{mp.group(1)}{mp.group(2)}"
lab = f"{q}.{mp.group(3)}"
cur = parts.setdefault(lab, {"q": q, "page": l.page, "bbox": _prefix_bbox(l),
"marks": None, "regions": []})
else:
mt = AQA_TOP_BOX.match(l.text)
if mt:
q = f"{mt.group(1)}{mt.group(2)}"
cur = parts.setdefault(f"{q}.0", {"q": q, "page": l.page, "bbox": _prefix_bbox(l),
"marks": None, "regions": []})
mm = AQA_MARK.search(l.text)
if mm and cur is not None and cur.get("marks") is None:
cur["marks"] = int(mm.group(1))
# drop a placeholder ".0" part if the same question also has real numbered parts
for q in {v["q"] for v in parts.values()}:
if f"{q}.0" in parts and any(parts[k]["q"] == q and k != f"{q}.0" for k in parts):
parts.pop(f"{q}.0")
return parts
# ====================================================================== Edexcel
EDX_TOTAL = re.compile(r"Total for Question\s+(\d+)\s+is\s+(\d+)\s+marks?", re.I)
EDX_LEAD = re.compile(r"^\s*(\d{1,2})\s+(.*)$") # number, gap, then the rest of the line
EDX_PART = re.compile(r"\(([a-h])\)") # may appear inline after the number
EDX_SUB = re.compile(r"^\s*\(([ivx]{1,4})\)")
EDX_MARK = re.compile(r"^\s*\((\d+)\)\s*$")
def edexcel_questions(lines):
# anchor top-level numbers on the robust "Total for Question N is M" signal (precision)
anchors = {} # qnum -> (total marks, anchor line)
for l in lines:
m = EDX_TOTAL.search(l.text)
if m:
anchors[int(m.group(1))] = (int(m.group(2)), l)
parts = {}
haspart = set() # questions that own lettered parts
curq = curlet = lastlab = None
def add(lab, q, l):
nonlocal lastlab
parts.setdefault(lab, {"q": q, "page": l.page, "bbox": _prefix_bbox(l, 40), "marks": None, "regions": []})
lastlab = lab
for l in lines:
if EDX_TOTAL.search(l.text):
curq = curlet = None
continue
ml = EDX_LEAD.match(l.text)
if ml and int(ml.group(1)) in anchors and (ml.group(2)[:1].isupper()
or ml.group(2).lstrip().startswith("(")):
curq, rest = ml.group(1), ml.group(2)
curlet = None
inline = EDX_PART.search(rest) # capture "(a)" sharing the lead line
if inline:
curlet = inline.group(1); haspart.add(curq); add(f"{curq}{curlet}", curq, l)
continue
if curq is None:
continue
mp = EDX_PART.match(l.text.lstrip())
if mp:
curlet = mp.group(1); haspart.add(curq); add(f"{curq}{curlet}", curq, l)
ms = EDX_SUB.match(l.text)
if ms and curlet:
add(f"{curq}{curlet}{ms.group(1)}", curq, l)
mm = EDX_MARK.match(l.text)
if mm and lastlab:
parts[lastlab]["marks"] = int(mm.group(1))
# part-less questions: one part carrying the authoritative Total-for-Question mark
for q, (total, anchor_line) in anchors.items():
if str(q) not in haspart:
parts.setdefault(str(q), {"q": str(q), "page": anchor_line.page,
"bbox": _prefix_bbox(anchor_line, 40),
"marks": total, "regions": []})
return parts, {}, anchors
# ====================================================================== OCR
OCR_PART = re.compile(r"^\s*\(([a-h])\)")
OCR_SUB = re.compile(r"^\s*\(([ivx]{1,4})\)")
OCR_MARK = re.compile(r"\[(\d+)\]")
OCR_EXT = re.compile(r"^\s*\(([a-h])\)\s*\*|^\s*(\d{1,2})\s*\*")
def ocr_questions(lines):
parts = {}
curq = curlet = None
expect = 1
inferred = 0 # OCR may drop the margin question number; infer from part structure
for l in lines:
# top-level = the NEXT integer in sequence, gap, then question text OR a part opener "(a)"
# (Q3 opens straight into (a)). Sequence gate = the precision filter.
ml = re.match(r"^\s*(\d{1,2})\s+(\(|[A-Z])", l.text)
if ml and int(ml.group(1)) == expect:
curq = ml.group(1); curlet = None; expect += 1
parts.setdefault(curq, {"q": curq, "page": l.page, "bbox": _prefix_bbox(l, 36),
"marks": None, "regions": [], "_lead": True})
if curq is None:
# number was OCR-dropped: start an inferred question on its first part "(a)"
m0 = OCR_PART.match(l.text.lstrip())
if m0 and m0.group(1) == "a":
inferred += 1; curq = f"~{inferred}"; curlet = None
else:
continue
ext = bool(re.search(r"\(\s*[a-h]\s*\)\s*\*", l.text))
mp = OCR_PART.match(l.text)
if mp:
# a repeat "(a)" while this question already owns one => next question, number dropped
if mp.group(1) == "a" and f"{curq}a" in parts:
inferred += 1; curq = f"~{inferred}"
curlet = mp.group(1)
parts.pop(curq, None)
lab = f"{curq}{curlet}"
parts.setdefault(lab, {"q": curq, "page": l.page, "bbox": _prefix_bbox(l, 36),
"marks": None, "regions": [], "extended": ext})
ms = OCR_SUB.match(l.text)
if ms and curlet:
lab = f"{curq}{curlet}{ms.group(1)}"
parts.setdefault(lab, {"q": curq, "page": l.page, "bbox": _prefix_bbox(l, 36),
"marks": None, "regions": [], "extended": ext})
mm = OCR_MARK.search(l.text)
if mm:
sib = [k for k in parts if parts[k]["q"] == curq and not parts[k].get("_lead")]
if sib:
parts[sib[-1]]["marks"] = int(mm.group(1))
for v in parts.values():
v.pop("_lead", None)
return parts
# ====================================================================== shared layers
LABEL_TO_TAXONOMY = {
"checkbox_unselected": "mcq_option", "checkbox_selected": "mcq_option",
"picture": "context_figure", "table": "context_data", "caption": "context_caption",
"page_header": "furniture", "page_footer": "furniture",
"section_header": "heading", "list_item": "instruction",
}
def docling_regions(doc):
regions = []
for key in ("texts", "pictures", "tables"):
for it in doc.get(key, []):
lab = it.get("label", key[:-1])
tax = LABEL_TO_TAXONOMY.get(lab)
if not tax:
continue
prov = it.get("prov") or []
bb = prov[0].get("bbox") if prov else None
pg = prov[0].get("page_no") if prov else None
if bb is None:
continue
regions.append({"type": tax, "docling_label": lab, "page": pg, "bbox": bb,
"text": (it.get("text") or "")[:80]})
return regions
def _norm_region_type(kind):
kind = (kind or "answer_lines").strip().lower().replace("-", "_")
return kind if kind in {"answer_lines", "answer_box", "working_space"} else "working_space"
def merge_gemma(parts, gemma_dir):
"""Attach gemma4:e4b answer_regions (#3) to parts by for_part; gap-fill missing marks."""
n_reg = n_fill = 0
for fn in sorted(glob.glob(os.path.join(gemma_dir, "p*.json"))):
d = json.load(open(fn))
for r in d.get("answer_regions", []):
lab = _norm_label(r.get("for_part", ""))
if lab in parts:
parts[lab]["regions"].append({"type": _norm_region_type(r.get("kind", "answer_lines")),
"source": "gemma",
**({"bbox": r.get("bbox")} if r.get("bbox") else {})})
n_reg += 1
for qp in d.get("question_parts", []):
lab = _norm_label(qp.get("label", ""))
if lab in parts and parts[lab].get("marks") is None and qp.get("marks") is not None:
parts[lab]["marks"] = qp["marks"]; n_fill += 1
return n_reg, n_fill
def _norm_label(s):
"""gemma sometimes emits '0_4' or '01_2' for AQA -> '01.4'/'01.2'."""
s = (s or "").strip().replace("_", ".")
m = re.match(r"^(\d)\.(\d)$", s)
if m: # '0.4' -> drop, ambiguous; keep as-is otherwise
return s
return s
def attach_detected_response_regions(parts, pdf_path):
"""Attach OpenCV response-region candidates to the nearest known part on the same page.
This is the deterministic answer-region backbone used before/alongside gemma: it emits the
same answer_lines / answer_box / working_space taxonomy and keeps the mapper schema unchanged.
Coordinates from regions.py are rendered-page TOPLEFT px; callers can persist them as candidate
response areas or use the counts as harness coverage.
"""
if not pdf_path or not os.path.exists(pdf_path):
return 0, []
try:
candidates = region_mod.detect_response_regions_from_pdf(pdf_path, min_confidence=0.32)
except RuntimeError as exc:
print(f"response-regions : unavailable ({exc})")
return 0, []
except Exception as exc:
print(f"response-regions : failed ({exc})")
return 0, []
by_page = defaultdict(list)
for lab, part in parts.items():
if part.get("page") is not None and part.get("bbox"):
by_page[int(part["page"])].append((lab, part))
attached = 0
for cand in candidates:
# regions.py page_index is zero-based; extraction/template parts are one-based.
pg = int(cand.get("page_index", 0)) + 1
page_parts = by_page.get(pg) or []
if not page_parts:
continue
rb = cand.get("bbox") or {}
meta = cand.get("meta") or {}
center_top_px = float(rb.get("y", 0)) + float(rb.get("h", 0)) / 2
page_height_px = float(meta.get("page_height_px") or 0)
page_height_pdf = float(meta.get("page_height_pdf") or 0)
if page_height_px > 0 and page_height_pdf > 0:
region_y_pdf = (1.0 - center_top_px / page_height_px) * page_height_pdf
else:
region_y_pdf = -center_top_px
best_lab = None
best_score = 1e9
for lab, part in page_parts:
pb = part.get("bbox") or {}
part_mid = (float(pb.get("t", 0)) + float(pb.get("b", 0))) / 2
# Prefer the nearest label above/near the response area; a small penalty keeps
# previous-part assignment stable when regions sit between two labels.
below_penalty = 0 if region_y_pdf <= float(pb.get("t", 0)) + 18 else 120
score = abs(part_mid - region_y_pdf) + below_penalty
if score < best_score:
best_lab, best_score = lab, score
if best_lab:
parts[best_lab].setdefault("regions", []).append({
"type": _norm_region_type(cand.get("region_type")),
"source": "opencv",
"confidence": cand.get("confidence"),
"bbox": rb,
"detection_method": cand.get("detection_method"),
**({"line_count": cand.get("line_count")} if cand.get("line_count") is not None else {}),
})
attached += 1
return attached, candidates
def extract_tables(parts, doc, granite="off", pdf=None, cache_glob=None):
"""Selective table-cell extraction (PLAN.md §B): standard TableFormer grids always; Granite
<otsl> on router-flagged pages when granite!='off'. Returns (data_tables, all_tables).
Granite tables win over the standard pipeline on pages it covers (cleaner grids)."""
std = tbl_mod.tables_from_standard(doc)
gran = []
if granite != "off":
pages = tbl_mod.candidate_pages(doc)
if granite == "cached":
cache = tbl_mod._load_cached_doctags(cache_glob or "")
for pg in pages:
for t in tbl_mod.parse_otsl(cache.get(pg, "")):
t["page"] = pg; gran.append(t)
elif granite == "live" and pdf:
gran = tbl_mod.granite_tables(pdf, pages, cached_glob=cache_glob)
gran_pages = {t["page"] for t in gran}
combined = gran + [t for t in std if t["page"] not in gran_pages]
data = tbl_mod.attach_to_questions(combined, parts)
for v in parts.values():
if v.get("tables"):
v["has_table"] = True
return data, combined
def attach_marks_by_geometry(parts, doc):
"""AQA Docling path: marks live in a flat [N marks] list; bind each to the nearest
preceding part on the same page by vertical position."""
marks = []
for t in doc.get("texts", []):
prov = t.get("prov") or []
bb = prov[0].get("bbox") if prov else None
pg = prov[0].get("page_no") if prov else None
for m in AQA_MARK.finditer(t.get("text") or ""):
marks.append((pg, bb, int(m.group(1))))
by_page = defaultdict(list)
for lab, v in parts.items():
if v.get("page") is not None:
by_page[v["page"]].append((lab, v))
n = 0
for pg, bb, val in marks:
cands = by_page.get(pg, [])
if not cands or bb is None:
continue
my = (bb["t"] + bb["b"]) / 2
best = min(cands, key=lambda kv: abs(((kv[1]["bbox"] or {}).get("t", 0)
+ (kv[1]["bbox"] or {}).get("b", 0)) / 2 - my)
if kv[1].get("bbox") else 1e9)
if best[1].get("marks") is None:
best[1]["marks"] = val; n += 1
return n, marks
# ----------------------------------------------------------------- assembly + coverage
def build_questions(parts):
qs = defaultdict(list)
for lab in parts:
qs[parts[lab]["q"]].append(lab)
out = []
for q in sorted(qs, key=lambda x: (len(x), x)):
plist = sorted(qs[q])
out.append({
"question": q,
"parts": [{"label": lab, "page": parts[lab].get("page"),
"bbox": parts[lab].get("bbox"), # label geometry (None for born-digital text)
"marks": parts[lab].get("marks"),
"regions": parts[lab].get("regions", []),
"has_table": parts[lab].get("has_table", False),
"extended": parts[lab].get("extended", False)} for lab in plist],
})
return out
GT_PARTS_PHYSICS = ["01.1","01.2","01.3","01.4","02.1","02.2","02.3","02.4","03.1","03.2","03.3","03.4",
"04.1","04.2","04.3","04.4","04.5","05.1","05.2","05.3","05.4","06.1","06.2","06.3",
"07.1","07.2","07.3","08.1","08.2","08.3","08.4","08.5","09.1","09.2","09.3",
"10.1","10.2","10.3","11.1","11.2","11.3","11.4"]
# official paper maxima — the strongest grammar sanity check (marks_sum should match)
EXPECTED_MAX = {"8463": 100, "7408": 85, "7402": 91, "7405": 105, "8461": 100, "8462": 100, "8464": 70, "1MA1": 80, "H556": 70}
def expected_max(code):
if not code:
return None
for k, v in EXPECTED_MAX.items():
if code.startswith(k):
return v
return None
def parse_text_by_board(lines, board):
"""Run the board grammar over a line stream -> parts dict (used for GT + born-digital)."""
if board == "edexcel":
parts, _, _ = edexcel_questions(lines); return parts
if board == "ocr":
return ocr_questions(lines)
return aqa_questions_text(lines)
def coverage(parts, gt_labels):
rec = set(parts)
hit = sorted(rec & set(gt_labels))
miss = sorted(set(gt_labels) - rec)
return {"coverage_pct": round(len(hit) / len(gt_labels) * 100, 1) if gt_labels else None,
"recovered": len(hit), "total": len(gt_labels), "missed": miss}
# ----------------------------------------------------------------- main
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--text", help="pdftotext source or .txt (born-digital / GT path)")
ap.add_argument("--auto", help="PDF: auto-detect text layer -> fast-path (--text), else "
"report the OCR path is required (no GPU work attempted here)")
ap.add_argument("--ocr", help="PDF to OCR live via dsync (uses the shared GPU)")
ap.add_argument("--docling", help="cached Docling JSON (OCR path without re-running dsync)")
ap.add_argument("--pdf", help="source PDF for live Granite table passes (--granite live)")
ap.add_argument("--rapid", help="AQA RapidOCR per-page glob (the v1 95% path)")
ap.add_argument("--gemma", help="gemma sweep dir with p*.json answer_regions")
ap.add_argument("--response-regions", dest="response_regions_pdf", help="PDF to scan with deterministic response-region detector and attach to parts")
ap.add_argument("--marks-fill", dest="marks_fill",
help="gemma_marks.py fills JSON: fill marks=None parts (Edexcel/OCR (N)/[N] gap-fill)")
ap.add_argument("--granite", default="off", choices=["off", "cached", "live"],
help="selective Granite-Docling <otsl> tables: cached doctags or live via dsync")
ap.add_argument("--granite-cache", default="results/VLM_granite_p*.doctags",
help="glob of cached *.doctags for --granite cached / live fallback")
ap.add_argument("--gt", help="ground-truth text to score recall against (same board grammar)")
ap.add_argument("--expected-max", type=int, help="authoritative paper max marks for OCR eval harnesses when front matter/code OCR is missing")
ap.add_argument("--board", default="auto", choices=["auto", "aqa", "edexcel", "ocr"])
ap.add_argument("--out", default="results/structured.json")
a = ap.parse_args()
# --- auto path selection -------------------------------------------------------------
# Caller need not know in advance whether the PDF is born-digital or image-only: detect the
# text layer and either fold --auto into the fast-path (--text) or report that the OCR path
# is required. This is ADDITIVE — it only resolves --auto; every other mode is untouched.
if a.auto:
chars, n_pages, cpp = text_layer_chars_per_page(a.auto)
if cpp >= TEXT_LAYER_MIN_CHARS_PER_PAGE:
print(f"auto-detect : born-digital text layer "
f"({chars} chars / {n_pages} pages = {cpp:.0f} chars/page "
f">= {TEXT_LAYER_MIN_CHARS_PER_PAGE}) -> fast-path (pdftotext, no GPU)")
a.text = a.auto
else:
print(f"auto-detect : NO usable text layer "
f"({chars} chars / {n_pages} pages = {cpp:.1f} chars/page "
f"< {TEXT_LAYER_MIN_CHARS_PER_PAGE}) -> OCR path required")
print("route : run the OCR pipeline, e.g.")
print(f" python extract.py --ocr {a.auto}")
print(" (AQA image-only papers use the RapidOCR margin-pass; "
"see scripts/rapid_pass.py)")
return
# default invocation == v1 AQA physics regression guard
if not (a.text or a.ocr or a.docling):
a.docling = "results/E_tess_full.json"
a.rapid = a.rapid or "results/rapid_pages/p*.json"
a.gemma = a.gemma or "results/gemma_sweep_physics_200"
a.pdf = a.pdf or "samples/AQA-Physics-Paper-1H-2022-with-qr.pdf"
a.out = "results/physics_structured.json" if a.out == "results/structured.json" else a.out
doc = None
pages = []
if a.ocr:
try:
from . import dsync
except ImportError: # pragma: no cover - CLI execution
import dsync
doc = dsync.convert_document(a.ocr, {"ocr_engine": "tesseract", "force_ocr": True})
lines = lines_from_docling(doc)
elif a.docling:
doc = json.load(open(a.docling))
lines = lines_from_docling(doc)
else:
if a.text and a.text.endswith(".pdf"):
lines, pages = _bbox_lines_from_pdftotext(a.text)
else:
lines = lines_from_pdftext(a.text)
board, code = detect_board(lines)
if a.board != "auto":
board = a.board
fm = extract_front_matter(lines, board, code)
# --- questions: AQA uses the proven Docling+RapidOCR path when inputs exist ----------
if board == "aqa" and a.rapid and glob.glob(a.rapid):
parts = aqa_questions_rapid(a.rapid)
path_used = "aqa-docling+rapidocr (v1)"
else:
parts = parse_text_by_board(lines, board)
path_used = f"{board}-text-grammar"
# --- shared enrichment ---------------------------------------------------------------
regions = docling_regions(doc) if doc else []
n_mark_geo = 0
if doc and board == "aqa":
n_mark_geo, _ = attach_marks_by_geometry(parts, doc)
data_tables, all_tables = ([], [])
if doc:
data_tables, all_tables = extract_tables(parts, doc, granite=a.granite,
pdf=(a.pdf or a.ocr), cache_glob=a.granite_cache)
n_tbl = sum(1 for v in parts.values() if v.get("has_table"))
tbl_pages = sorted({t["page"] for t in data_tables if t["page"]})
n_reg = n_fill = 0
if a.gemma and os.path.isdir(a.gemma):
n_reg, n_fill = merge_gemma(parts, a.gemma)
n_cv_regions = 0
cv_region_candidates = []
response_pdf = a.response_regions_pdf or a.pdf or a.ocr
if response_pdf:
n_cv_regions, cv_region_candidates = attach_detected_response_regions(parts, response_pdf)
n_marks_fill = 0
if a.marks_fill and os.path.exists(a.marks_fill):
fills = json.load(open(a.marks_fill)).get("fills", {})
for lab, mk in fills.items():
if lab in parts and parts[lab].get("marks") is None:
parts[lab]["marks"] = int(mk); n_marks_fill += 1
exp_max_override = a.expected_max
# Targeted marks gap-fill: if OCR recovered all but one mark and the authoritative
# paper max leaves a small plausible residual, attach that residual to the lone
# missing part. This keeps the deterministic label backbone and only fills the
# narrow low-confidence gap instead of using gemma/full extraction as source of truth.
n_residual_marks_fill = 0
if exp_max_override:
missing_labs = [lab for lab, part in parts.items() if part.get("marks") is None]
known_sum = sum(part["marks"] for part in parts.values() if part.get("marks") is not None)
residual = exp_max_override - known_sum
if len(missing_labs) == 1 and 1 <= residual <= 9:
parts[missing_labs[0]]["marks"] = residual
n_residual_marks_fill = 1
questions = build_questions(parts)
# --- coverage ------------------------------------------------------------------------
if a.gt:
gt_lines = lines_from_pdftext(a.gt)
gt_parts = parse_text_by_board(gt_lines, board)
cov = coverage(parts, list(gt_parts))
cov["source"] = "gt-text-same-grammar"
elif board == "aqa" and "rapidocr" in path_used:
cov = coverage(parts, GT_PARTS_PHYSICS)
cov["source"] = "builtin-physics-gt"
else:
cov = {"coverage_pct": None, "note": "no GT provided"}
marks_known = sum(1 for v in parts.values() if v.get("marks") is not None)
marks_sum = sum(v["marks"] for v in parts.values() if v.get("marks") is not None)
exp_max = exp_max_override or expected_max(code) or fm.get("max_marks") # harness override, code-based, else front-matter total
marks_check = (None if exp_max is None else
{"sum": marks_sum, "expected_max": exp_max,
"pct": round(marks_sum / exp_max * 100, 1)})
result = {
"board": board, "paper_code": code, "front_matter": fm, "path": path_used,
"pages": pages,
"questions": questions,
"regions": regions,
"tables": data_tables,
"stats": {
"n_questions": len({v["q"] for v in parts.values()}),
"n_parts": len(parts),
"marks_parts_known": marks_known, "marks_sum": marks_sum,
"marks_check": marks_check,
"gemma_answer_regions": n_reg, "gemma_marks_filled": n_fill,
"gemma_marks_gapfilled": n_marks_fill,
"residual_marks_gapfilled": n_residual_marks_fill,
"opencv_answer_regions": n_cv_regions,
"opencv_answer_region_candidates": len(cv_region_candidates),
"n_data_tables": len(data_tables),
"n_furniture_tables": sum(1 for t in all_tables if t["is_furniture"]),
"table_sources": {s: sum(1 for t in data_tables if t["source"] == s)
for s in sorted({t["source"] for t in data_tables})},
"table_pages": tbl_pages,
"region_type_counts": {t: sum(1 for r in regions if r["type"] == t)
for t in sorted({r["type"] for r in regions})},
},
"coverage": cov,
}
json.dump(result, open(a.out, "w"), indent=2)
print(f"board : {board} ({code or 'wordmark'}) [{path_used}]")
print(f"front-matter : {', '.join(f'{k}={v}' for k,v in fm.items() if not isinstance(v,(list,dict)))}")
print(f"questions : {result['stats']['n_questions']} top-level, {len(parts)} parts")
mc = f" | {marks_sum}/{exp_max} of max ({marks_check['pct']}%)" if marks_check else ""
print(f"marks : {marks_known}/{len(parts)} parts known (sum {marks_sum}){mc}"
+ (f"; +{n_mark_geo} by geometry" if n_mark_geo else ""))
print(f"gemma regions : {n_reg} answer_regions, {n_fill} marks gap-filled"
+ (f"; +{n_marks_fill} marks via --marks-fill" if n_marks_fill else "")
+ (f"; +{n_residual_marks_fill} residual marks gap-fill" if n_residual_marks_fill else ""))
if response_pdf:
print(f"opencv regions : {n_cv_regions} attached / {len(cv_region_candidates)} candidates")
print(f"tables : {len(data_tables)} data table(s) "
f"{result['stats']['table_sources']} on pages {tbl_pages}; "
f"{result['stats']['n_furniture_tables']} furniture filtered; {n_tbl} parts flagged")
if cov.get("coverage_pct") is not None:
print(f"COVERAGE : {cov['coverage_pct']}% ({cov['recovered']}/{cov['total']})"
f" missed: {cov['missed'][:8]}{'' if len(cov['missed'])>8 else ''} [{cov['source']}]")
print(f"-> wrote {a.out}")
if __name__ == "__main__":
main()