""" seed_exam_corpus.py — manifest-driven loader for the public exam-paper corpus. SCOPE (separate from infra): assumes storage buckets already exist (provisioned by run/initialization/buckets.py during infra init). This loader UPLOADS papers and SEEDS the catalogue; it does NOT create buckets. Pipeline per manifest item: validate -> upload file to cc.examboards (canonical path) -> upsert eb_specifications / eb_exams (catalogue) -> (optional) copy a subset into a test user's exam space -> (optional, --first-sweep) run the docling/auto-map first pass to gather structure. Manifest template: ~/cc/specs/exam-corpus-manifest.example.yaml Catalogue columns (real): eb_specifications(spec_code, exam_board_code, award_code, subject_code, first_teach, spec_ver, storage_loc, doc_type, doc_details jsonb) eb_exams(exam_code, spec_code, paper_code, tier, session, type_code, storage_loc, doc_type, doc_details jsonb) Run inside the api container, e.g.: python3 -m run.initialization.seed_exam_corpus --manifest /path/exam-corpus.yaml --dry-run python3 -m run.initialization.seed_exam_corpus --manifest ... --board AQA --first-sweep """ from __future__ import annotations import argparse import hashlib import os from dataclasses import dataclass, field from typing import Any, Dict, List, Optional import yaml # PyYAML from modules.logger_tool import initialise_logger from modules.database.supabase.utils.client import SupabaseServiceRoleClient logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) EXAM_BUCKET = "cc.examboards" DOC_TYPES = {"QP", "MS", "INSERT", "ER", "SPECIMEN", "GRADE_BOUNDARIES", "DATA_SHEET"} TIERS = {"H", "F", None} # ─────────────────────────────── canonical storage paths ─────────────────────────────── def _lc(s: str) -> str: return (s or "").strip().lower().replace(" ", "-") def spec_storage_loc(board: str, subject: str, award: str, spec_ver: str) -> str: # e.g. cc.examboards/aqa/physics/8463/spec/1.1.pdf return f"{EXAM_BUCKET}/{_lc(board)}/{_lc(subject)}/{_lc(award)}/spec/{_lc(spec_ver or 'spec')}.pdf" def paper_storage_loc(board: str, subject: str, award: str, paper_code: str, session: str, doc_type: str) -> str: # e.g. cc.examboards/aqa/physics/8463/1h/2022-jun/qp.pdf paper_safe = _lc(paper_code).split("/")[-1] return f"{EXAM_BUCKET}/{_lc(board)}/{_lc(subject)}/{_lc(award)}/{paper_safe}/{_lc(session)}/{_lc(doc_type)}.pdf" # ─────────────────────────────── validation ─────────────────────────────── @dataclass class LoadReport: specs_upserted: int = 0 papers_upserted: int = 0 files_uploaded: int = 0 files_skipped: int = 0 user_copies: int = 0 swept: int = 0 errors: List[str] = field(default_factory=list) def validate_manifest(m: Dict[str, Any]) -> List[str]: errs: List[str] = [] seen_specs, seen_exams = set(), set() for board in m.get("boards", []): for spec in board.get("specifications", []): sc = spec.get("spec_code") if not sc or sc in seen_specs: errs.append(f"spec_code missing/duplicate: {sc!r}") seen_specs.add(sc) for p in spec.get("papers", []): ec = p.get("exam_code") if not ec or ec in seen_exams: errs.append(f"exam_code missing/duplicate: {ec!r}") seen_exams.add(ec) if p.get("doc_type") not in DOC_TYPES: errs.append(f"{ec}: bad doc_type {p.get('doc_type')!r}") if p.get("tier") not in TIERS: errs.append(f"{ec}: bad tier {p.get('tier')!r}") # TODO(agent): resolve p['file']['source'] (local path or url:) and confirm it exists. return errs # ─────────────────────────────── loader steps (TODOs for the gathering agent) ─────────────────────────────── def _resolve_source_bytes(source: str) -> bytes: """Local path or 'url:https://...'. TODO(agent): implement url fetch + caching.""" if source.startswith("url:"): raise NotImplementedError("url: sources — implement fetch in the gathering task") with open(source, "rb") as f: return f.read() def upload_file(client: SupabaseServiceRoleClient, storage_loc: str, data: bytes, *, force: bool, rep: LoadReport) -> None: bucket, _, path = storage_loc.partition("/") # TODO(agent): skip-if-exists + checksum (sha256) unless force; then client.upload_file(bucket, path, data). _ = hashlib.sha256(data).hexdigest() logger.info(f"[upload] {storage_loc} ({len(data)} bytes) force={force}") rep.files_uploaded += 1 def upsert_specification(spec: Dict[str, Any], storage_loc: Optional[str], rep: LoadReport) -> None: row = { "spec_code": spec["spec_code"], "exam_board_code": spec["exam_board_code"], "award_code": spec.get("award_code"), "subject_code": spec.get("subject_code"), "first_teach": spec.get("first_teach"), "spec_ver": spec.get("spec_ver"), "storage_loc": storage_loc, "doc_type": "pdf", "doc_details": {"award_level": spec.get("award_level"), "provenance": spec.get("spec_file", {}).get("provenance")}, } # TODO(agent): upsert into eb_specifications on conflict (spec_code). logger.info(f"[spec] upsert {row['spec_code']}") rep.specs_upserted += 1 def upsert_paper(spec_code: str, p: Dict[str, Any], storage_loc: str, rep: LoadReport) -> None: row = { "exam_code": p["exam_code"], "spec_code": spec_code, "paper_code": p.get("paper_code"), "tier": p.get("tier"), "session": p.get("session"), "type_code": p.get("type_code"), "doc_type": p["doc_type"], "storage_loc": storage_loc, "doc_details": {"original_name": p.get("file", {}).get("original_name"), "provenance": p.get("file", {}).get("provenance")}, } # TODO(agent): upsert into eb_exams on conflict (exam_code). logger.info(f"[paper] upsert {row['exam_code']} doc_type={row['doc_type']}") rep.papers_upserted += 1 def copy_user_test_subset(m: Dict[str, Any], rep: LoadReport) -> None: """TODO(agent): copy a small subset of admin papers into a test user's exam space (cc.users/{user_id}/exam-marker/... or cc.institutes/...) + create user exam_templates rows, so user-side flows (upload-as-exam / promote-from-cabinet / mark) are testable.""" logger.info("[user-subset] TODO: seed user test papers from admin subset") def first_sweep(m: Dict[str, Any], rep: LoadReport) -> None: """TODO(agent): run the docling/auto_map first pass over seeded papers to populate exam_templates/questions/regions/layout structure as part of seeding (calls api.services.docling.auto_map + the /auto-map upsert mapper path).""" logger.info("[first-sweep] TODO: run auto-map first pass on seeded papers") # ─────────────────────────────── orchestration ─────────────────────────────── def load(manifest_path: str, *, dry_run: bool, force: bool, board_filter: Optional[str], spec_filter: Optional[str], user_subset: bool, do_first_sweep: bool) -> LoadReport: with open(manifest_path) as f: m = yaml.safe_load(f) rep = LoadReport() errs = validate_manifest(m) if errs: rep.errors = errs logger.error(f"manifest validation failed: {len(errs)} error(s)") for e in errs[:20]: logger.error(f" - {e}") if not dry_run: return rep client = None if dry_run else SupabaseServiceRoleClient() for board in m.get("boards", []): if board_filter and board.get("exam_board_code") != board_filter: continue for spec in board.get("specifications", []): if spec_filter and spec.get("spec_code") != spec_filter: continue sloc = None sf = spec.get("spec_file") if sf: sloc = spec_storage_loc(board["exam_board_code"], spec.get("subject_code", ""), spec.get("award_code", ""), spec.get("spec_ver", "")) if not dry_run: upload_file(client, sloc, _resolve_source_bytes(sf["source"]), force=force, rep=rep) if not dry_run: upsert_specification(spec, sloc, rep) for p in spec.get("papers", []): ploc = paper_storage_loc(board["exam_board_code"], spec.get("subject_code", ""), spec.get("award_code", ""), p["paper_code"], p["session"], p["doc_type"]) if not dry_run: upload_file(client, ploc, _resolve_source_bytes(p["file"]["source"]), force=force, rep=rep) upsert_paper(spec["spec_code"], p, ploc, rep) if user_subset and not dry_run: copy_user_test_subset(m, rep) if do_first_sweep and not dry_run: first_sweep(m, rep) logger.info(f"corpus load done: specs={rep.specs_upserted} papers={rep.papers_upserted} " f"uploaded={rep.files_uploaded} skipped={rep.files_skipped} errors={len(rep.errors)}") return rep def main() -> None: ap = argparse.ArgumentParser(description="Seed the public exam-paper corpus from a manifest.") ap.add_argument("--manifest", required=True) ap.add_argument("--dry-run", action="store_true", help="validate + report, no writes") ap.add_argument("--force", action="store_true", help="re-upload/overwrite existing storage objects") ap.add_argument("--board", default=None, help="only this exam_board_code") ap.add_argument("--spec", default=None, help="only this spec_code") ap.add_argument("--user-subset", action="store_true", help="also seed a user-side test subset") ap.add_argument("--first-sweep", action="store_true", help="run docling/auto-map first pass on seeded papers") a = ap.parse_args() load(a.manifest, dry_run=a.dry_run, force=a.force, board_filter=a.board, spec_filter=a.spec, user_subset=a.user_subset, do_first_sweep=a.first_sweep) if __name__ == "__main__": main()