feat(seed): implement exam-corpus loader + filled 505-paper manifest

Implements the seed_exam_corpus.py skeleton TODOs against the real APIs and
fills the public exam corpus from official board sources.

Loader (run/initialization/seed_exam_corpus.py):
- _resolve_source_bytes: local path | url: fetch with on-disk cache + PDF validation
- upload_file: real StorageAdmin.upload_file, skip-if-exists+sha256 unless --force
- upsert_specification/upsert_paper: real upserts on spec_code/exam_code.
  Fix: QP/MS/INSERT/ER role -> eb_exams.type_code; doc_type set to 'pdf'
  (doc_type is CHECK-constrained to file formats; the skeleton wrote the role there).
- copy_user_test_subset: copy a QP subset into a test user's cc.users exam space + files rows
- first_sweep: auto_map + the /auto-map row mapper over seeded QPs -> system-owned
  exam_templates + questions/response_areas/boundaries/layout (idempotent)
- identity discovery via institute_memberships.profile_id

Manifest (run/initialization/manifests/):
- exam-corpus.yaml: 505 papers / 18 specs / AQA+Edexcel+OCR, every source URL HEAD-verified.
  AQA sciences GCSE 8461/8462/8463/8464 + AS/A-level 7401-7408, sessions JUN18-JUN24, QP+MS+ER, F+H.
- generate_corpus_manifest.py: regenerates + re-verifies all URLs from official hosts.

seed_curriculum.py: deprecation banner -> superseded by seed_exam_corpus.py; storage_loc
standardised on cc.examboards.

Verified on dev .94: full 505-paper seed (eb_specifications=18, eb_exams=505, QP=211),
idempotent re-runs, first-sweep + user-subset, 6/6 buckets provisioned.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
CC Worker 2026-06-07 22:58:03 +00:00
parent d8cf3bbc62
commit 5750413f43
4 changed files with 6960 additions and 83 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,291 @@
#!/usr/bin/env python3
"""
generate_corpus_manifest.py build the public exam-corpus manifest from OFFICIAL sources,
verifying every source URL is live before it is written.
Output: exam-corpus.yaml (consumed by run/initialization/seed_exam_corpus.py).
Sources (all official exam-board hosts; public past-paper PDFs):
AQA filestore.aqa.org.uk fully templatable; enumerated + HEAD-verified here.
Edexcel qualifications.pearson.com date suffix non-derivable; confirmed URLs embedded.
OCR www.ocr.org.uk/Images opaque doc-id; confirmed URLs embedded.
Every URL is HEAD/GET-checked (200 + application/pdf) before inclusion, so the committed
manifest never carries a dead or wrong-cased link. Re-run to refresh as more sessions go public.
Conventions (locked see ~/cc/ideas/2026-06-07-exam-paper-ingestion.md):
session = "YYYY-Mon" e.g. 2022-Jun
exam_code = BOARD-award-PAPER-SESSIONCOMPACT-ROLE e.g. AQA-8463-1H-2022JUN-QP
"""
from __future__ import annotations
import concurrent.futures as cf
import os
import sys
import urllib.error
import urllib.request
from typing import Any, Dict, List, Optional, Tuple
import yaml
AQA_BASE = "https://filestore.aqa.org.uk/sample-papers-and-mark-schemes"
ROLE_TOKEN = {"QP": "QP", "MS": "MS", "ER": "WRE"} # AQA filestore role tokens
MONTHS = {"JUN": ("june", "Jun"), "NOV": ("november", "Nov")}
FETCHED = "2026-06-07"
def head_ok(url: str, timeout: int = 20) -> bool:
"""True iff the URL resolves to a real PDF (200 + application/pdf), following redirects.
AQA soft-404s redirect to www.aqa.org.uk/req_path=... (text/html), so we check content-type.
Uses a tiny Range GET (stdlib urllib) so we never pull the whole PDF just to verify it."""
req = urllib.request.Request(url, headers={"Range": "bytes=0-3", "User-Agent": "cc-corpus/1.0"})
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
ctype = (r.headers.get("content-type") or "").lower()
return r.status in (200, 206) and "pdf" in ctype
except urllib.error.HTTPError as e:
# A 206/200 PDF never lands here; 404/redirect-to-html will.
ctype = (e.headers.get("content-type") or "").lower() if e.headers else ""
return e.code in (200, 206) and "pdf" in ctype
except Exception:
return False
# ─────────────────────────── AQA catalogue ───────────────────────────
# spec_code, subject, award, award_level, first_teach, [(filestore_papercode, paper_code, tier), ...]
def _gcse_single(award: str) -> List[Tuple[str, str, Optional[str]]]:
out = []
for paper in ("1", "2"):
for tier in ("F", "H"):
out.append((f"{award}{paper}{tier}", f"{award}/{paper}{tier}", tier))
return out
def _trilogy(award: str) -> List[Tuple[str, str, Optional[str]]]:
out = []
for subj in ("B", "C", "P"):
for paper in ("1", "2"):
for tier in ("F", "H"):
out.append((f"{award}{subj}{paper}{tier}", f"{award}/{subj}/{paper}{tier}", tier))
return out
def _alevel(award: str, papers=("1", "2", "3")) -> List[Tuple[str, str, Optional[str]]]:
return [(f"{award}{p}", f"{award}/{p}", None) for p in papers]
AQA_SPECS = [
("AQA-BIOL-8461", "BIOLOGY", "8461", "GCSE", "2016", _gcse_single("8461")),
("AQA-CHEM-8462", "CHEMISTRY", "8462", "GCSE", "2016", _gcse_single("8462")),
("AQA-PHYS-8463", "PHYSICS", "8463", "GCSE", "2016", _gcse_single("8463")),
("AQA-COMB-8464", "COMBINED SCIENCE TRILOGY", "8464", "GCSE", "2016", _trilogy("8464")),
("AQA-BIOL-7401", "BIOLOGY", "7401", "AS", "2015", _alevel("7401", ("1", "2"))),
("AQA-BIOL-7402", "BIOLOGY", "7402", "A-level", "2015", _alevel("7402")),
("AQA-CHEM-7404", "CHEMISTRY", "7404", "AS", "2015", _alevel("7404", ("1", "2"))),
("AQA-CHEM-7405", "CHEMISTRY", "7405", "A-level", "2015", _alevel("7405")),
("AQA-PHYS-7407", "PHYSICS", "7407", "AS", "2015", _alevel("7407", ("1", "2"))),
("AQA-PHYS-7408", "PHYSICS", "7408", "A-level", "2015", _alevel("7408")),
]
AQA_SESSIONS = ["JUN18", "JUN19", "NOV20", "NOV21", "JUN22", "JUN23", "JUN24"]
AQA_ROLES = ["QP", "MS", "ER"]
def aqa_url(papercode: str, role: str, session: str) -> Tuple[str, str]:
mon = session[:3]
yy = session[3:]
folder, _ = MONTHS[mon]
year = "20" + yy
fname = f"AQA-{papercode}-{ROLE_TOKEN[role]}-{session}.PDF"
return f"{AQA_BASE}/{year}/{folder}/{fname}", fname
def session_pretty(session: str) -> Tuple[str, str]:
mon = session[:3] # "JUN" | "NOV"
yy = session[3:] # "22"
_, pretty = MONTHS[mon]
# ("2022-Jun" display session, "2022JUN" compact for exam_code — year-first, matches the
# locked exam_code convention and the Edexcel/OCR entries).
return f"20{yy}-{pretty}", f"20{yy}{mon}"
def build_aqa() -> Dict[str, Any]:
candidates: List[Tuple[str, str, str, str, str, str, Optional[str], str, str, str]] = []
# (spec_code, subject, award, paper_fc, paper_code, tier, role, session, url, fname)
spec_meta = {}
for spec_code, subject, award, level, first_teach, papers in AQA_SPECS:
spec_meta[spec_code] = (subject, award, level, first_teach)
for paper_fc, paper_code, tier in papers:
for session in AQA_SESSIONS:
for role in AQA_ROLES:
url, fname = aqa_url(paper_fc, role, session)
candidates.append((spec_code, subject, award, paper_fc, paper_code, tier,
role, session, url, fname))
print(f"[AQA] HEAD-verifying {len(candidates)} candidate URLs...", file=sys.stderr)
live: Dict[int, bool] = {}
with cf.ThreadPoolExecutor(max_workers=24) as ex:
futs = {ex.submit(head_ok, c[8]): i for i, c in enumerate(candidates)}
done = 0
for fut in cf.as_completed(futs):
i = futs[fut]
live[i] = fut.result()
done += 1
if done % 60 == 0:
print(f" ...{done}/{len(candidates)} ({sum(live.values())} live)", file=sys.stderr)
specs: Dict[str, Dict[str, Any]] = {}
for i, c in enumerate(candidates):
if not live.get(i):
continue
spec_code, subject, award, paper_fc, paper_code, tier, role, session, url, fname = c
sess_pretty, sess_compact = session_pretty(session)
token = paper_fc[len(award):] # "1H" / "P1H" / "1"
exam_code = f"AQA-{award}-{token}-{sess_compact}-{role}"
spec = specs.setdefault(spec_code, {"papers": []})
spec["papers"].append({
"exam_code": exam_code,
"paper_code": paper_code,
"tier": tier,
"session": sess_pretty,
"doc_type": role,
"file": {
"source": f"url:{url}",
"original_name": fname,
"provenance": {"source_url": url, "fetched": FETCHED,
"license": "AQA public past paper"},
},
})
spec_list = []
for spec_code, subject, award, level, first_teach, _papers in AQA_SPECS:
if spec_code not in specs:
continue
papers = sorted(specs[spec_code]["papers"], key=lambda p: p["exam_code"])
spec_list.append({
"spec_code": spec_code, "exam_board_code": "AQA", "subject_code": subject,
"award_code": award, "award_level": level, "first_teach": first_teach,
"papers": papers,
})
print(f"[AQA] {spec_code}: {len(papers)} live papers", file=sys.stderr)
return {"exam_board_code": "AQA", "specifications": spec_list}
# ─────────────── Edexcel / OCR — confirmed direct URLs (re-verified at build) ───────────────
# Each tuple: (spec_code, subject, award, level, first_teach, exam_code, paper_code, tier,
# session, role, url, original_name)
EDEXCEL_SPECS = {
"EDX-BIOL-1BI0": ("BIOLOGY", "1BI0", "GCSE", "2016"),
"EDX-CHEM-1CH0": ("CHEMISTRY", "1CH0", "GCSE", "2016"),
"EDX-PHYS-1PH0": ("PHYSICS", "1PH0", "GCSE", "2016"),
"EDX-COMB-1SC0": ("COMBINED SCIENCE", "1SC0", "GCSE", "2016"),
}
_EDX = "https://qualifications.pearson.com/content/dam/pdf/GCSE/Science/2016"
EDEXCEL_PAPERS = [
("EDX-BIOL-1BI0", "EDX-1BI0-1H-2024JUN-QP", "1BI0/1H", "H", "2024-Jun", "QP", f"{_EDX}/Exam-materials/1bi0-1h-que-20240511.pdf"),
("EDX-BIOL-1BI0", "EDX-1BI0-2F-2023JUN-QP", "1BI0/2F", "F", "2023-Jun", "QP", f"{_EDX}/Exam-materials/1bi0-2f-que-20230610.pdf"),
("EDX-BIOL-1BI0", "EDX-1BI0-2H-2023JUN-QP", "1BI0/2H", "H", "2023-Jun", "QP", f"{_EDX}/Exam-materials/1bi0-2h-que-20230610.pdf"),
("EDX-BIOL-1BI0", "EDX-1BI0-1F-2023JUN-MS", "1BI0/1F", "F", "2023-Jun", "MS", f"{_EDX}/Exam-materials/1bi0-1f-rms-20230824.pdf"),
("EDX-BIOL-1BI0", "EDX-1BI0-1H-2024JUN-MS", "1BI0/1H", "H", "2024-Jun", "MS", f"{_EDX}/Exam-materials/1bi0-1h-rms-20240822.pdf"),
("EDX-BIOL-1BI0", "EDX-1BI0-1H-2022JUN-MS", "1BI0/1H", "H", "2022-Jun", "MS", f"{_EDX}/exam-materials/1bi0-1h-rms-20220825.pdf"),
("EDX-CHEM-1CH0", "EDX-1CH0-1F-2023JUN-QP", "1CH0/1F", "F", "2023-Jun", "QP", f"{_EDX}/Exam-materials/1ch0-1f-que-20230523.pdf"),
("EDX-CHEM-1CH0", "EDX-1CH0-1H-2024JUN-QP", "1CH0/1H", "H", "2024-Jun", "QP", f"{_EDX}/Exam-materials/1ch0-1h-que-20240518.pdf"),
("EDX-CHEM-1CH0", "EDX-1CH0-2H-2024JUN-MS", "1CH0/2H", "H", "2024-Jun", "MS", f"{_EDX}/Exam-materials/1ch0-2h-rms-20240822.pdf"),
("EDX-PHYS-1PH0", "EDX-1PH0-1H-2023JUN-QP", "1PH0/1H", "H", "2023-Jun", "QP", f"{_EDX}/Exam-materials/1ph0-1h-que-20230526.pdf"),
("EDX-PHYS-1PH0", "EDX-1PH0-2F-2023JUN-QP", "1PH0/2F", "F", "2023-Jun", "QP", f"{_EDX}/Exam-materials/1ph0-2f-que-20230617.pdf"),
("EDX-PHYS-1PH0", "EDX-1PH0-1H-2024JUN-QP", "1PH0/1H", "H", "2024-Jun", "QP", f"{_EDX}/Exam-materials/1ph0-1h-que-20240523.pdf"),
("EDX-PHYS-1PH0", "EDX-1PH0-2H-2023JUN-MS", "1PH0/2H", "H", "2023-Jun", "MS", f"{_EDX}/Exam-materials/1ph0-2h-rms-20230824.pdf"),
("EDX-PHYS-1PH0", "EDX-1PH0-2H-2022JUN-MS", "1PH0/2H", "H", "2022-Jun", "MS", f"{_EDX}/exam-materials/1ph0-2h-rms-20220825.pdf"),
("EDX-COMB-1SC0", "EDX-1SC0-1CH-2023JUN-MS", "1SC0/1CH", None, "2023-Jun", "MS", f"{_EDX}/Exam-materials/1sc0-1ch-rms-20230824.pdf"),
]
OCR_SPECS = {
"OCR-BIOL-J247": ("BIOLOGY", "J247", "GCSE", "2016"),
"OCR-CHEM-J248": ("CHEMISTRY", "J248", "GCSE", "2016"),
"OCR-PHYS-J249": ("PHYSICS", "J249", "GCSE", "2016"),
"OCR-COMB-J250": ("COMBINED SCIENCE", "J250", "GCSE", "2016"),
}
_OCR = "https://www.ocr.org.uk/Images"
OCR_PAPERS = [
("OCR-BIOL-J247", "OCR-J247-1-2024JUN-QP", "J247/01", "F", "2024-Jun", "QP", f"{_OCR}/727713-question-paper-paper-1.pdf"),
("OCR-BIOL-J247", "OCR-J247-1-2024JUN-MS", "J247/01", "F", "2024-Jun", "MS", f"{_OCR}/727745-mark-scheme-paper-1.pdf"),
("OCR-BIOL-J247", "OCR-J247-3-2024JUN-QP", "J247/03", "H", "2024-Jun", "QP", f"{_OCR}/727715-question-paper-paper-3.pdf"),
("OCR-BIOL-J247", "OCR-J247-3-2024JUN-MS", "J247/03", "H", "2024-Jun", "MS", f"{_OCR}/727747-mark-scheme-paper-3.pdf"),
("OCR-BIOL-J247", "OCR-J247-1-2023JUN-QP", "J247/01", "F", "2023-Jun", "QP", f"{_OCR}/704945-question-paper-paper-1.pdf"),
("OCR-BIOL-J247", "OCR-J247-3-2023JUN-MS", "J247/03", "H", "2023-Jun", "MS", f"{_OCR}/704979-mark-scheme-paper-3.pdf"),
("OCR-BIOL-J247", "OCR-J247-3-2022JUN-QP", "J247/03", "H", "2022-Jun", "QP", f"{_OCR}/678031-question-paper-paper-3.pdf"),
("OCR-BIOL-J247", "OCR-J247-1-2022JUN-MS", "J247/01", "F", "2022-Jun", "MS", f"{_OCR}/678076-mark-scheme-paper-1.pdf"),
("OCR-CHEM-J248", "OCR-J248-1-2024JUN-QP", "J248/01", "F", "2024-Jun", "QP", f"{_OCR}/727718-question-paper-paper-1.pdf"),
("OCR-CHEM-J248", "OCR-J248-3-2024JUN-MS", "J248/03", "H", "2024-Jun", "MS", f"{_OCR}/727751-mark-scheme-paper-3.pdf"),
("OCR-CHEM-J248", "OCR-J248-1-2023JUN-QP", "J248/01", "F", "2023-Jun", "QP", f"{_OCR}/704950-question-paper-paper-1.pdf"),
("OCR-CHEM-J248", "OCR-J248-3-2022JUN-QP", "J248/03", "H", "2022-Jun", "QP", f"{_OCR}/678036-question-paper-paper-3.pdf"),
("OCR-PHYS-J249", "OCR-J249-1-2024JUN-QP", "J249/01", "F", "2024-Jun", "QP", f"{_OCR}/727724-question-paper-paper-1.pdf"),
("OCR-PHYS-J249", "OCR-J249-3-2024JUN-MS", "J249/03", "H", "2024-Jun", "MS", f"{_OCR}/727755-mark-scheme-paper-3.pdf"),
("OCR-PHYS-J249", "OCR-J249-1-2023JUN-QP", "J249/01", "F", "2023-Jun", "QP", f"{_OCR}/704956-question-paper-paper-1.pdf"),
("OCR-PHYS-J249", "OCR-J249-3-2022JUN-MS", "J249/03", "H", "2022-Jun", "MS", f"{_OCR}/678086-mark-scheme-paper-3.pdf"),
("OCR-COMB-J250", "OCR-J250-1-2024JUN-QP", "J250/01", "F", "2024-Jun", "QP", f"{_OCR}/727730-question-paper-paper-1.pdf"),
("OCR-COMB-J250", "OCR-J250-7-2024JUN-MS", "J250/07", "H", "2024-Jun", "MS", f"{_OCR}/727763-mark-scheme-paper-7.pdf"),
]
def build_board(board_code: str, specs_meta: Dict, papers: List) -> Dict[str, Any]:
print(f"[{board_code}] re-verifying {len(papers)} confirmed URLs...", file=sys.stderr)
by_spec: Dict[str, List[Dict[str, Any]]] = {}
for spec_code, exam_code, paper_code, tier, session, role, url in papers:
if not head_ok(url):
print(f" DROP (not live): {url}", file=sys.stderr)
continue
by_spec.setdefault(spec_code, []).append({
"exam_code": exam_code, "paper_code": paper_code, "tier": tier,
"session": session, "doc_type": role,
"file": {"source": f"url:{url}", "original_name": os.path.basename(url),
"provenance": {"source_url": url, "fetched": FETCHED,
"license": f"{board_code} public past paper"}},
})
spec_list = []
for spec_code, (subject, award, level, first_teach) in specs_meta.items():
if spec_code not in by_spec:
continue
spec_list.append({
"spec_code": spec_code, "exam_board_code": board_code, "subject_code": subject,
"award_code": award, "award_level": level, "first_teach": first_teach,
"papers": sorted(by_spec[spec_code], key=lambda p: p["exam_code"]),
})
print(f"[{board_code}] {spec_code}: {len(by_spec[spec_code])} live papers", file=sys.stderr)
return {"exam_board_code": board_code, "specifications": spec_list}
def main() -> None:
out_path = os.path.join(os.path.dirname(__file__), "exam-corpus.yaml")
boards = [
build_aqa(),
build_board("EDEXCEL", EDEXCEL_SPECS, EDEXCEL_PAPERS),
build_board("OCR", OCR_SPECS, OCR_PAPERS),
]
n_specs = sum(len(b["specifications"]) for b in boards)
n_papers = sum(len(s["papers"]) for b in boards for s in b["specifications"])
manifest = {
"version": 1,
"defaults": {"bucket": "cc.examboards"},
"provenance": {
"collected_by": "kcar",
"collected_at": FETCHED,
"license_posture": ("Public exam-board past papers downloaded from each board's own "
"official site (AQA filestore, Pearson DAM, OCR Images). Stored in "
"the private dev cc.examboards bucket for internal exam-marker dev/test. "
"Each item records its source_url. Review redistribution rights before "
"any public exposure."),
"sources": {
"AQA": "https://filestore.aqa.org.uk/sample-papers-and-mark-schemes/",
"EDEXCEL": "https://qualifications.pearson.com/en/support/support-topics/exams/past-papers.html",
"OCR": "https://www.ocr.org.uk/qualifications/past-paper-finder/",
},
},
# Optional: uncomment + set on dev .94 to exercise user-side flows / first-sweep.
# "test_subset": {"user_email": "teacher@kevlarai.test", "papers": 2},
# "system_identity": {"user_email": "teacher@kevlarai.test"},
"boards": boards,
}
with open(out_path, "w") as fh:
yaml.safe_dump(manifest, fh, sort_keys=False, default_flow_style=False, width=120)
print(f"\nWROTE {out_path}: {n_specs} specs, {n_papers} papers across {len(boards)} boards",
file=sys.stderr)
if __name__ == "__main__":
main()

View File

@ -1,15 +1,20 @@
"""
seed_curriculum.py Create curriculum data: exam board specifications and exams.
seed_curriculum.py DEPRECATED hardcoded curriculum/exam seeder.
Seeds eb_specifications and eb_exams tables with realistic UK exam board data
(AQA, Edexcel, OCR) for Physics, Maths, and Computer Science across both schools.
SUPERSEDED (2026-06-07) by the manifest-driven corpus loader:
run/initialization/seed_exam_corpus.py (+ manifests/exam-corpus.yaml)
Also seeds curriculum_topics in Neo4j for the school databases.
The exam-board parts of this file (eb_specifications / eb_exams) are now seeded from a
verified, provenance-bearing manifest with real uploaded PDFs not the hardcoded rows
below. This module also had a storage_loc inconsistency the overhaul standardises away:
exam-board files belong in the `cc.examboards` bucket at the canonical path
`cc.examboards/{board}/{subject}/{award}/{paper}/{session}/{role}.pdf`, NOT under
`cc.public.snapshots/curriculum/...` (the placeholder rows below still show the old path).
Tables: eb_specifications, eb_exams
Neo4j: curriculum topic nodes in school databases
KEEP ONLY for the Neo4j `curriculum_topics` seed (step [3]) which has no replacement yet.
Do NOT use the eb_specifications/eb_exams blocks for new work use seed_exam_corpus.py.
Run inside ccapi container:
Run (Neo4j curriculum topics only is the supported remaining use):
python3 -c "from run.initialization.seed_curriculum import seed; seed()"
"""
import os

View File

@ -6,157 +6,510 @@ run/initialization/buckets.py during infra init). This loader UPLOADS papers and
SEEDS the catalogue; it does NOT create buckets.
Pipeline per manifest item:
validate -> upload file to cc.examboards (canonical path) -> upsert eb_specifications
/ eb_exams (catalogue) -> (optional) copy a subset into a test user's exam space
-> (optional, --first-sweep) run the docling/auto-map first pass to gather structure.
validate -> resolve source bytes (local path | url:, cached) -> upload file to
cc.examboards (canonical path, skip-if-exists unless --force) -> upsert
eb_specifications / eb_exams (catalogue) -> (optional, --user-subset) copy a subset
into a test user's exam space so user-side flows are testable -> (optional,
--first-sweep) run the docling/auto-map first pass to gather structure.
Manifest template: ~/cc/specs/exam-corpus-manifest.example.yaml
Catalogue columns (real):
eb_specifications(spec_code, exam_board_code, award_code, subject_code, first_teach,
spec_ver, storage_loc, doc_type, doc_details jsonb)
eb_exams(exam_code, spec_code, paper_code, tier, session, type_code, storage_loc,
doc_type, doc_details jsonb)
Run inside the api container, e.g.:
Catalogue columns (real verified against volumes/db/cc/61-core-schema.sql):
eb_specifications(spec_code UNIQUE, exam_board_code, award_code, subject_code,
first_teach, spec_ver, storage_loc, doc_type CHECK(pdf|json|...),
doc_details jsonb, docling_docs jsonb)
eb_exams(exam_code UNIQUE, spec_code FK, paper_code, tier, session, type_code,
storage_loc, doc_type CHECK(pdf|json|...), doc_details jsonb, docling_docs jsonb)
IMPORTANT schema note: the QP/MS/INSERT/ER *document role* is stored in `type_code`
(the `/catalogue` endpoint filters `type_code == 'QP'`). The `doc_type` column is the
*file format* and is CHECK-constrained to {pdf,json,md,html,txt,doctags} so it is
always 'pdf' here. (The manifest field is named `doc_type` for the role; the loader
maps manifest.doc_type -> DB.type_code and sets DB.doc_type = 'pdf'.)
Locked conventions (see ~/cc/ideas/2026-06-07-exam-paper-ingestion.md):
session = "YYYY-Mon" e.g. "2022-Jun", "2021-Nov"
exam_code = "{BOARD}-{award}-{paper_safe}-{SESSIONCOMPACT}-{ROLE}" e.g. AQA-8463-1H-2022JUN-QP
spec path = cc.examboards/{board}/{subject}/{award}/spec/{spec_ver}.pdf
paper path = cc.examboards/{board}/{subject}/{award}/{paper_safe}/{session}/{role}.pdf
Run inside the api container (env: SUPABASE_URL + SERVICE_ROLE_KEY for dev .94), e.g.:
python3 -m run.initialization.seed_exam_corpus --manifest /path/exam-corpus.yaml --dry-run
python3 -m run.initialization.seed_exam_corpus --manifest ... --board AQA --first-sweep
python3 -m run.initialization.seed_exam_corpus --manifest ... --board AQA
python3 -m run.initialization.seed_exam_corpus --manifest ... --first-sweep
"""
from __future__ import annotations
import argparse
import hashlib
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
import requests
import yaml # PyYAML
from modules.logger_tool import initialise_logger
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin, StorageError
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
EXAM_BUCKET = "cc.examboards"
DOC_TYPES = {"QP", "MS", "INSERT", "ER", "SPECIMEN", "GRADE_BOUNDARIES", "DATA_SHEET"}
# Manifest `doc_type` carries the document ROLE (stored in eb_exams.type_code).
DOC_ROLES = {"QP", "MS", "INSERT", "ER", "SPECIMEN", "GRADE_BOUNDARIES", "DATA_SHEET"}
TIERS = {"H", "F", None}
# Default working dir for cached url: downloads (override with --cache-dir / EXAM_CORPUS_CACHE).
DEFAULT_CACHE_DIR = os.getenv("EXAM_CORPUS_CACHE", "/tmp/exam-corpus-cache")
# ─────────────────────────────── canonical storage paths ───────────────────────────────
def _lc(s: str) -> str:
return (s or "").strip().lower().replace(" ", "-")
def _paper_safe(paper_code: str) -> str:
# Drop the award prefix, keep all remaining segments so combined-science sub-papers
# don't collide on the storage path:
# "8463/1H" -> "1h"
# "8464/B/1H" -> "b-1h" (Trilogy: subject letter + paper + tier)
# "7408/1" -> "1"
parts = _lc(paper_code).split("/")
return "-".join(parts[1:]) if len(parts) > 1 else parts[0]
def spec_storage_loc(board: str, subject: str, award: str, spec_ver: str) -> str:
# e.g. cc.examboards/aqa/physics/8463/spec/1.1.pdf
return f"{EXAM_BUCKET}/{_lc(board)}/{_lc(subject)}/{_lc(award)}/spec/{_lc(spec_ver or 'spec')}.pdf"
def paper_storage_loc(board: str, subject: str, award: str, paper_code: str, session: str, doc_type: str) -> str:
def paper_storage_loc(board: str, subject: str, award: str, paper_code: str, session: str, doc_role: str) -> str:
# e.g. cc.examboards/aqa/physics/8463/1h/2022-jun/qp.pdf
paper_safe = _lc(paper_code).split("/")[-1]
return f"{EXAM_BUCKET}/{_lc(board)}/{_lc(subject)}/{_lc(award)}/{paper_safe}/{_lc(session)}/{_lc(doc_type)}.pdf"
return f"{EXAM_BUCKET}/{_lc(board)}/{_lc(subject)}/{_lc(award)}/{_paper_safe(paper_code)}/{_lc(session)}/{_lc(doc_role)}.pdf"
# ─────────────────────────────── validation ───────────────────────────────
# ─────────────────────────────── report ───────────────────────────────
@dataclass
class LoadReport:
specs_upserted: int = 0
papers_upserted: int = 0
files_uploaded: int = 0
files_skipped: int = 0
files_failed: int = 0
user_copies: int = 0
swept: int = 0
sweep_failed: int = 0
errors: List[str] = field(default_factory=list)
def as_dict(self) -> Dict[str, Any]:
return {
"specs_upserted": self.specs_upserted,
"papers_upserted": self.papers_upserted,
"files_uploaded": self.files_uploaded,
"files_skipped": self.files_skipped,
"files_failed": self.files_failed,
"user_copies": self.user_copies,
"swept": self.swept,
"sweep_failed": self.sweep_failed,
"errors": self.errors,
}
# ─────────────────────────────── validation ───────────────────────────────
def validate_manifest(m: Dict[str, Any]) -> List[str]:
errs: List[str] = []
seen_specs, seen_exams = set(), set()
for board in m.get("boards", []):
bcode = board.get("exam_board_code")
if not bcode:
errs.append("board missing exam_board_code")
for spec in board.get("specifications", []):
sc = spec.get("spec_code")
if not sc or sc in seen_specs:
errs.append(f"spec_code missing/duplicate: {sc!r}")
seen_specs.add(sc)
for field_name in ("award_code", "subject_code"):
if not spec.get(field_name):
errs.append(f"{sc}: missing {field_name}")
for p in spec.get("papers", []):
ec = p.get("exam_code")
if not ec or ec in seen_exams:
errs.append(f"exam_code missing/duplicate: {ec!r}")
seen_exams.add(ec)
if p.get("doc_type") not in DOC_TYPES:
errs.append(f"{ec}: bad doc_type {p.get('doc_type')!r}")
if p.get("doc_type") not in DOC_ROLES:
errs.append(f"{ec}: bad doc_type/role {p.get('doc_type')!r} (want one of {sorted(DOC_ROLES)})")
if p.get("tier") not in TIERS:
errs.append(f"{ec}: bad tier {p.get('tier')!r}")
# TODO(agent): resolve p['file']['source'] (local path or url:) and confirm it exists.
errs.append(f"{ec}: bad tier {p.get('tier')!r} (want H|F|null)")
if not p.get("paper_code"):
errs.append(f"{ec}: missing paper_code")
if not p.get("session"):
errs.append(f"{ec}: missing session")
src = (p.get("file") or {}).get("source")
if not src:
errs.append(f"{ec}: missing file.source")
elif not src.startswith("url:") and not os.path.exists(src):
errs.append(f"{ec}: local source not found: {src}")
return errs
# ─────────────────────────────── loader steps (TODOs for the gathering agent) ───────────────────────────────
def _resolve_source_bytes(source: str) -> bytes:
"""Local path or 'url:https://...'. TODO(agent): implement url fetch + caching."""
# ─────────────────────────────── source resolution (local | url:, cached) ───────────────────────────────
def _resolve_source_bytes(source: str, *, cache_dir: str) -> bytes:
"""Resolve a manifest file source to bytes.
'url:https://...' -> fetch (cached to cache_dir by url hash) ; verifies non-empty.
'<local path>' -> read from disk.
"""
if source.startswith("url:"):
raise NotImplementedError("url: sources — implement fetch in the gathering task")
with open(source, "rb") as f:
return f.read()
url = source[len("url:"):]
os.makedirs(cache_dir, exist_ok=True)
cache_key = hashlib.sha1(url.encode("utf-8")).hexdigest()
cache_path = os.path.join(cache_dir, f"{cache_key}.pdf")
if os.path.exists(cache_path) and os.path.getsize(cache_path) > 0:
with open(cache_path, "rb") as fh:
return fh.read()
logger.info(f"[fetch] {url}")
resp = requests.get(url, timeout=60, allow_redirects=True)
resp.raise_for_status()
data = resp.content
ctype = resp.headers.get("content-type", "")
if not data:
raise ValueError(f"empty download: {url}")
if "pdf" not in ctype.lower() and not data[:5].startswith(b"%PDF"):
raise ValueError(f"not a PDF (content-type={ctype!r}): {url}")
tmp = cache_path + ".part"
with open(tmp, "wb") as fh:
fh.write(data)
os.replace(tmp, cache_path)
return data
with open(source, "rb") as fh:
return fh.read()
def upload_file(client: SupabaseServiceRoleClient, storage_loc: str, data: bytes, *, force: bool, rep: LoadReport) -> None:
# ─────────────────────────────── storage upload (skip-if-exists + sha256) ───────────────────────────────
def _split_loc(storage_loc: str) -> Tuple[str, str]:
bucket, _, path = storage_loc.partition("/")
# TODO(agent): skip-if-exists + checksum (sha256) unless force; then client.upload_file(bucket, path, data).
_ = hashlib.sha256(data).hexdigest()
logger.info(f"[upload] {storage_loc} ({len(data)} bytes) force={force}")
rep.files_uploaded += 1
return bucket, path
def upsert_specification(spec: Dict[str, Any], storage_loc: Optional[str], rep: LoadReport) -> None:
row = {
"spec_code": spec["spec_code"], "exam_board_code": spec["exam_board_code"],
"award_code": spec.get("award_code"), "subject_code": spec.get("subject_code"),
"first_teach": spec.get("first_teach"), "spec_ver": spec.get("spec_ver"),
"storage_loc": storage_loc, "doc_type": "pdf",
"doc_details": {"award_level": spec.get("award_level"),
"provenance": spec.get("spec_file", {}).get("provenance")},
def _object_exists(storage: StorageAdmin, bucket: str, path: str) -> bool:
"""Existence check by listing the object's parent folder (Supabase storage has no stat)."""
parent, _, name = path.rpartition("/")
try:
listing = storage.client.supabase.storage.from_(bucket).list(parent)
except Exception as exc:
logger.warning(f"[exists?] list failed for {bucket}/{parent}: {exc}")
return False
return any((item.get("name") == name) for item in (listing or []))
def upload_file(storage: StorageAdmin, storage_loc: str, data: bytes, *, force: bool, rep: LoadReport) -> str:
"""Upload PDF bytes to storage at storage_loc. Returns the sha256 of the bytes.
Idempotent: if the object already exists and --force was not given, skips the upload
(the catalogue upsert still runs and records the checksum). With --force, overwrites.
"""
sha = hashlib.sha256(data).hexdigest()
bucket, path = _split_loc(storage_loc)
if not force and _object_exists(storage, bucket, path):
logger.info(f"[upload] skip-exists {storage_loc} (sha256={sha[:12]})")
rep.files_skipped += 1
return sha
try:
storage.upload_file(bucket, path, data, "application/pdf", upsert=True)
logger.info(f"[upload] {storage_loc} ({len(data)} bytes, sha256={sha[:12]}) force={force}")
rep.files_uploaded += 1
except StorageError as exc:
logger.error(f"[upload] FAILED {storage_loc}: {exc}")
rep.files_failed += 1
rep.errors.append(f"upload {storage_loc}: {exc}")
return sha
# ─────────────────────────────── catalogue upserts ───────────────────────────────
def upsert_specification(client: SupabaseServiceRoleClient, spec: Dict[str, Any],
storage_loc: Optional[str], sha: Optional[str], rep: LoadReport) -> None:
sf = spec.get("spec_file") or {}
doc_details = {
"award_level": spec.get("award_level"),
"provenance": sf.get("provenance"),
"original_name": sf.get("original_name"),
"sha256": sha,
}
# TODO(agent): upsert into eb_specifications on conflict (spec_code).
row = {
"spec_code": spec["spec_code"],
"exam_board_code": spec["exam_board_code"],
"award_code": spec.get("award_code"),
"subject_code": spec.get("subject_code"),
"first_teach": spec.get("first_teach"),
"spec_ver": spec.get("spec_ver"),
"storage_loc": storage_loc,
"doc_type": "pdf", # file format (CHECK-constrained); the role lives on eb_exams.type_code
"doc_details": {k: v for k, v in doc_details.items() if v is not None},
}
try:
client.supabase.table("eb_specifications").upsert(row, on_conflict="spec_code").execute()
logger.info(f"[spec] upsert {row['spec_code']}")
rep.specs_upserted += 1
except Exception as exc:
logger.error(f"[spec] FAILED {row['spec_code']}: {exc}")
rep.errors.append(f"spec {row['spec_code']}: {exc}")
def upsert_paper(spec_code: str, p: Dict[str, Any], storage_loc: str, rep: LoadReport) -> None:
row = {
"exam_code": p["exam_code"], "spec_code": spec_code, "paper_code": p.get("paper_code"),
"tier": p.get("tier"), "session": p.get("session"), "type_code": p.get("type_code"),
"doc_type": p["doc_type"], "storage_loc": storage_loc,
"doc_details": {"original_name": p.get("file", {}).get("original_name"),
"provenance": p.get("file", {}).get("provenance")},
def upsert_paper(client: SupabaseServiceRoleClient, spec_code: str, p: Dict[str, Any],
storage_loc: str, sha: Optional[str], rep: LoadReport) -> None:
f = p.get("file") or {}
doc_role = p["doc_type"] # manifest role: QP|MS|INSERT|ER...
doc_details = {
"doc_role": doc_role, # mirror of type_code for clarity
"original_name": f.get("original_name"),
"provenance": f.get("provenance"),
"sha256": sha,
}
# TODO(agent): upsert into eb_exams on conflict (exam_code).
logger.info(f"[paper] upsert {row['exam_code']} doc_type={row['doc_type']}")
row = {
"exam_code": p["exam_code"],
"spec_code": spec_code,
"paper_code": p.get("paper_code"),
"tier": p.get("tier"),
"session": p.get("session"),
"type_code": doc_role, # ROLE goes here (QP/MS/INSERT/ER)
"doc_type": "pdf", # file format (CHECK-constrained)
"storage_loc": storage_loc,
"doc_details": {k: v for k, v in doc_details.items() if v is not None},
}
try:
client.supabase.table("eb_exams").upsert(row, on_conflict="exam_code").execute()
logger.info(f"[paper] upsert {row['exam_code']} type_code={doc_role}")
rep.papers_upserted += 1
except Exception as exc:
logger.error(f"[paper] FAILED {row['exam_code']}: {exc}")
rep.errors.append(f"paper {row['exam_code']}: {exc}")
def copy_user_test_subset(m: Dict[str, Any], rep: LoadReport) -> None:
"""TODO(agent): copy a small subset of admin papers into a test user's exam space
(cc.users/{user_id}/exam-marker/... or cc.institutes/...) + create user exam_templates rows,
so user-side flows (upload-as-exam / promote-from-cabinet / mark) are testable."""
logger.info("[user-subset] TODO: seed user test papers from admin subset")
def first_sweep(m: Dict[str, Any], rep: LoadReport) -> None:
"""TODO(agent): run the docling/auto_map first pass over seeded papers to populate
exam_templates/questions/regions/layout structure as part of seeding (calls
api.services.docling.auto_map + the /auto-map upsert mapper path)."""
logger.info("[first-sweep] TODO: run auto-map first pass on seeded papers")
# ─────────────────────────────── user-side test subset ───────────────────────────────
def _resolve_test_user(client: SupabaseServiceRoleClient, cfg: Dict[str, Any]) -> Optional[Tuple[str, str]]:
"""Resolve (user_id, institute_id) for the user-side subset from config, with discovery fallback."""
user_id = cfg.get("user_id")
if not user_id and cfg.get("user_email"):
res = client.supabase.table("profiles").select("id").eq("email", cfg["user_email"]).limit(1).execute()
rows = getattr(res, "data", None) or []
user_id = rows[0]["id"] if rows else None
if not user_id:
logger.warning("[user-subset] no test user resolvable (set test_subset.user_id or user_email); skipping")
return None
institute_id = cfg.get("institute_id")
if not institute_id:
res = client.supabase.table("institute_memberships").select("institute_id").eq("profile_id", user_id).limit(1).execute()
rows = getattr(res, "data", None) or []
institute_id = rows[0]["institute_id"] if rows else None
if not institute_id:
logger.warning(f"[user-subset] no institute for user {user_id}; skipping")
return None
return user_id, institute_id
def copy_user_test_subset(client: SupabaseServiceRoleClient, storage: StorageAdmin,
m: Dict[str, Any], rep: LoadReport) -> None:
"""Copy a small subset of admin papers into a test user's exam space so user-side flows
(upload-as-exam / promote-from-cabinet / mark) are testable.
Driven by an optional manifest `test_subset:` block:
test_subset:
user_id: <uuid> # or user_email: <email>
institute_id: <uuid> # optional; discovered from membership if omitted
papers: 2 # how many QP papers to copy (default 2)
Degrades gracefully (logs + skips) if no test user is resolvable on this env.
"""
cfg = m.get("test_subset") or {}
resolved = _resolve_test_user(client, cfg)
if not resolved:
return
user_id, institute_id = resolved
limit = int(cfg.get("papers", 2))
# Gather candidate QP papers (admin corpus already uploaded to cc.examboards).
candidates: List[Tuple[str, Dict[str, Any]]] = []
for board in m.get("boards", []):
for spec in board.get("specifications", []):
for p in spec.get("papers", []):
if p.get("doc_type") == "QP":
candidates.append((board["exam_board_code"], spec, p))
candidates = candidates[:limit]
if not candidates:
logger.info("[user-subset] no QP papers to copy")
return
# Ensure a cabinet for the user.
cab_name = "Exam Marker Template Sources"
res = client.supabase.table("file_cabinets").select("id").eq("user_id", user_id).eq("name", cab_name).limit(1).execute()
rows = getattr(res, "data", None) or []
if rows:
cabinet_id = rows[0]["id"]
else:
ins = client.supabase.table("file_cabinets").insert({"user_id": user_id, "name": cab_name}).execute()
cabinet_id = (getattr(ins, "data", None) or [{}])[0].get("id")
if not cabinet_id:
logger.warning("[user-subset] could not ensure cabinet; skipping")
return
import uuid as _uuid
for board_code, spec, p in candidates:
src_loc = paper_storage_loc(board_code, spec.get("subject_code", ""), spec.get("award_code", ""),
p["paper_code"], p["session"], p["doc_type"])
sbucket, spath = _split_loc(src_loc)
try:
data = storage.download_file(sbucket, spath)
except Exception as exc:
logger.warning(f"[user-subset] source missing {src_loc}: {exc}; skipping {p['exam_code']}")
continue
file_id = str(_uuid.uuid4())
safe_name = f"{p['exam_code']}.pdf"
dst_bucket = "cc.users"
dst_path = f"exam-marker/{institute_id}/{cabinet_id}/{file_id}/{safe_name}"
try:
storage.upload_file(dst_bucket, dst_path, data, "application/pdf", upsert=True)
except Exception as exc:
logger.warning(f"[user-subset] copy upload failed {dst_path}: {exc}")
continue
client.supabase.table("files").upsert({
"id": file_id, "cabinet_id": cabinet_id, "name": safe_name, "path": dst_path,
"bucket": dst_bucket, "mime_type": "application/pdf", "uploaded_by": user_id,
"size_bytes": len(data), "source": "exam-corpus-seed", "is_directory": False,
"relative_path": safe_name, "processing_status": "uploaded",
}).execute()
logger.info(f"[user-subset] copied {p['exam_code']} -> {dst_bucket}/{dst_path}")
rep.user_copies += 1
# ─────────────────────────────── first sweep (docling auto-map) ───────────────────────────────
def _resolve_system_identity(client: SupabaseServiceRoleClient, m: Dict[str, Any]) -> Optional[Tuple[str, str]]:
cfg = m.get("system_identity") or m.get("test_subset") or {}
user_id = cfg.get("teacher_id") or cfg.get("user_id")
if not user_id and cfg.get("user_email"):
res = client.supabase.table("profiles").select("id").eq("email", cfg["user_email"]).limit(1).execute()
rows = getattr(res, "data", None) or []
user_id = rows[0]["id"] if rows else None
institute_id = cfg.get("institute_id")
if user_id and not institute_id:
res = client.supabase.table("institute_memberships").select("institute_id").eq("profile_id", user_id).limit(1).execute()
rows = getattr(res, "data", None) or []
institute_id = rows[0]["institute_id"] if rows else None
if not user_id or not institute_id:
logger.warning("[first-sweep] no system identity (set system_identity.teacher_id+institute_id); skipping sweep")
return None
return user_id, institute_id
def first_sweep(client: SupabaseServiceRoleClient, storage: StorageAdmin,
m: Dict[str, Any], board_filter: Optional[str], spec_filter: Optional[str],
cache_dir: str, rep: LoadReport) -> None:
"""Run the docling/auto_map first pass over seeded QP papers and persist the resulting
template structure (questions/response areas/boundaries/layout) via the same mapping the
/auto-map endpoint uses. System-owned exam_templates are created per QP paper.
Requires a resolvable `system_identity` (teacher_id/user_email + institute_id) on this env.
"""
identity = _resolve_system_identity(client, m)
if not identity:
return
teacher_id, institute_id = identity
# Import the auto-map mapping helpers lazily (pulls fastapi/router only when sweeping).
try:
from api.services.docling import auto_map, AutoMapError
from routers.exam.templates import _map_first_pass_to_rows
except Exception as exc:
logger.error(f"[first-sweep] could not import auto-map pipeline: {exc}")
rep.errors.append(f"first-sweep import: {exc}")
return
sb = client.supabase
for board in m.get("boards", []):
if board_filter and board.get("exam_board_code") != board_filter:
continue
for spec in board.get("specifications", []):
if spec_filter and spec.get("spec_code") != spec_filter:
continue
for p in spec.get("papers", []):
if p.get("doc_type") != "QP":
continue
# Resolve the seeded eb_exams row (id) for the template join.
ex = sb.table("eb_exams").select("id, exam_code").eq("exam_code", p["exam_code"]).limit(1).execute()
ex_rows = getattr(ex, "data", None) or []
exam_id = ex_rows[0]["id"] if ex_rows else None
loc = paper_storage_loc(board["exam_board_code"], spec.get("subject_code", ""),
spec.get("award_code", ""), p["paper_code"], p["session"], p["doc_type"])
bkt, path = _split_loc(loc)
try:
pdf_bytes = storage.download_file(bkt, path)
except Exception as exc:
logger.warning(f"[first-sweep] source missing {loc}: {exc}; skipping {p['exam_code']}")
continue
# Ensure a system-owned template for this paper (idempotent on exam_code+teacher).
tpl = sb.table("exam_templates").select("id").eq("exam_code", p["exam_code"]).eq("teacher_id", teacher_id).limit(1).execute()
tpl_rows = getattr(tpl, "data", None) or []
if tpl_rows:
template_id = tpl_rows[0]["id"]
else:
new_tpl = sb.table("exam_templates").insert({
"exam_id": exam_id, "exam_code": p["exam_code"], "institute_id": institute_id,
"teacher_id": teacher_id, "title": f"{p['exam_code']} (auto-map seed)",
"subject": spec.get("subject_code"), "status": "draft",
}).execute()
template_id = (getattr(new_tpl, "data", None) or [{}])[0].get("id")
if not template_id:
logger.warning(f"[first-sweep] could not ensure template for {p['exam_code']}; skipping")
continue
try:
first_pass = auto_map(pdf_bytes, source_pdf=loc)
rows = _map_first_pass_to_rows(template_id, first_pass, pdf_bytes)
except (AutoMapError, ValueError) as exc:
logger.warning(f"[first-sweep] auto-map failed for {p['exam_code']}: {exc}")
rep.sweep_failed += 1
continue
except Exception as exc:
logger.exception(f"[first-sweep] unexpected error for {p['exam_code']}: {exc}")
rep.sweep_failed += 1
continue
# Refresh derived rows. Seed templates are system-owned with no human edits to
# preserve, so we clear ALL child rows for the template (not just ai/unconfirmed)
# and re-insert id-deduped payloads — idempotent across re-runs and robust to the
# deterministic uuid5 ids the mapper can repeat within a batch.
for table in ("exam_response_areas", "exam_boundaries", "exam_template_layout", "exam_questions"):
sb.table(table).delete().eq("template_id", template_id).execute()
for table, key in (("exam_questions", "questions"), ("exam_response_areas", "response_areas"),
("exam_boundaries", "boundaries"), ("exam_template_layout", "layout")):
seen_ids: set = set()
payload = []
for r in (rows.get(key) or []):
rid = r.get("id")
if rid is not None and rid in seen_ids:
continue
if rid is not None:
seen_ids.add(rid)
payload.append(r)
if payload:
sb.table(table).insert(payload).execute()
updates = {"page_count": first_pass.get("meta", {}).get("n_pages")}
sb.table("exam_templates").update({k: v for k, v in updates.items() if v is not None}).eq("id", template_id).execute()
logger.info(f"[first-sweep] swept {p['exam_code']} -> template {template_id} "
f"(q={len(rows.get('questions', []))} ra={len(rows.get('response_areas', []))})")
rep.swept += 1
# ─────────────────────────────── orchestration ───────────────────────────────
def load(manifest_path: str, *, dry_run: bool, force: bool, board_filter: Optional[str],
spec_filter: Optional[str], user_subset: bool, do_first_sweep: bool) -> LoadReport:
spec_filter: Optional[str], user_subset: bool, do_first_sweep: bool,
cache_dir: str = DEFAULT_CACHE_DIR) -> LoadReport:
with open(manifest_path) as f:
m = yaml.safe_load(f)
rep = LoadReport()
errs = validate_manifest(m)
if errs:
rep.errors = errs
rep.errors = list(errs)
logger.error(f"manifest validation failed: {len(errs)} error(s)")
for e in errs[:20]:
for e in errs[:40]:
logger.error(f" - {e}")
if not dry_run:
return rep
client = None if dry_run else SupabaseServiceRoleClient()
storage = None if dry_run else StorageAdmin()
for board in m.get("boards", []):
if board_filter and board.get("exam_board_code") != board_filter:
@ -164,29 +517,48 @@ def load(manifest_path: str, *, dry_run: bool, force: bool, board_filter: Option
for spec in board.get("specifications", []):
if spec_filter and spec.get("spec_code") != spec_filter:
continue
# Specification document (optional).
sloc = None
spec_sha = None
sf = spec.get("spec_file")
if sf:
if sf and sf.get("source"):
sloc = spec_storage_loc(board["exam_board_code"], spec.get("subject_code", ""),
spec.get("award_code", ""), spec.get("spec_ver", ""))
if not dry_run:
upload_file(client, sloc, _resolve_source_bytes(sf["source"]), force=force, rep=rep)
try:
spec_sha = upload_file(storage, sloc, _resolve_source_bytes(sf["source"], cache_dir=cache_dir),
force=force, rep=rep)
except Exception as exc:
logger.error(f"[spec-file] {spec.get('spec_code')}: {exc}")
rep.files_failed += 1
rep.errors.append(f"spec-file {spec.get('spec_code')}: {exc}")
if not dry_run:
upsert_specification(spec, sloc, rep)
upsert_specification(client, spec, sloc, spec_sha, rep)
# Papers.
for p in spec.get("papers", []):
ploc = paper_storage_loc(board["exam_board_code"], spec.get("subject_code", ""),
spec.get("award_code", ""), p["paper_code"], p["session"], p["doc_type"])
if not dry_run:
upload_file(client, ploc, _resolve_source_bytes(p["file"]["source"]), force=force, rep=rep)
upsert_paper(spec["spec_code"], p, ploc, rep)
if dry_run:
continue
psha = None
try:
psha = upload_file(storage, ploc, _resolve_source_bytes(p["file"]["source"], cache_dir=cache_dir),
force=force, rep=rep)
except Exception as exc:
logger.error(f"[paper-file] {p.get('exam_code')}: {exc}")
rep.files_failed += 1
rep.errors.append(f"paper-file {p.get('exam_code')}: {exc}")
upsert_paper(client, spec["spec_code"], p, ploc, psha, rep)
if user_subset and not dry_run:
copy_user_test_subset(m, rep)
copy_user_test_subset(client, storage, m, rep)
if do_first_sweep and not dry_run:
first_sweep(m, rep)
first_sweep(client, storage, m, board_filter, spec_filter, cache_dir, rep)
logger.info(f"corpus load done: specs={rep.specs_upserted} papers={rep.papers_upserted} "
f"uploaded={rep.files_uploaded} skipped={rep.files_skipped} errors={len(rep.errors)}")
f"uploaded={rep.files_uploaded} skipped={rep.files_skipped} failed={rep.files_failed} "
f"user_copies={rep.user_copies} swept={rep.swept} errors={len(rep.errors)}")
return rep
@ -199,9 +571,12 @@ def main() -> None:
ap.add_argument("--spec", default=None, help="only this spec_code")
ap.add_argument("--user-subset", action="store_true", help="also seed a user-side test subset")
ap.add_argument("--first-sweep", action="store_true", help="run docling/auto-map first pass on seeded papers")
ap.add_argument("--cache-dir", default=DEFAULT_CACHE_DIR, help="cache dir for url: downloads")
a = ap.parse_args()
load(a.manifest, dry_run=a.dry_run, force=a.force, board_filter=a.board, spec_filter=a.spec,
user_subset=a.user_subset, do_first_sweep=a.first_sweep)
rep = load(a.manifest, dry_run=a.dry_run, force=a.force, board_filter=a.board, spec_filter=a.spec,
user_subset=a.user_subset, do_first_sweep=a.first_sweep, cache_dir=a.cache_dir)
import json
print(json.dumps(rep.as_dict(), indent=2))
if __name__ == "__main__":