api/run/initialization/seed_exam_corpus.py
kcar 7819e6e346
Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled
fix(seed): unseed user-subset storage objects
(cherry picked from commit 9328ec2e062d039c0bcfabb086ce0693fe1ebe50)
2026-06-08 00:13:40 +00:00

868 lines
44 KiB
Python

"""
seed_exam_corpus.py — manifest-driven loader for the public exam-paper corpus.
SCOPE (separate from infra): assumes storage buckets already exist (provisioned by
run/initialization/buckets.py during infra init). This loader UPLOADS papers and
SEEDS the catalogue; it does NOT create buckets.
Pipeline per manifest item:
validate -> resolve source bytes (local path | url:, cached) -> upload file to
cc.examboards (canonical path, skip-if-exists unless --force) -> upsert
eb_specifications / eb_exams (catalogue) -> (optional, --user-subset) copy a subset
into a test user's exam space so user-side flows are testable -> (optional,
--first-sweep) run the docling/auto-map first pass to gather structure.
Manifest template: ~/cc/specs/exam-corpus-manifest.example.yaml
Catalogue columns (real — verified against volumes/db/cc/61-core-schema.sql):
eb_specifications(spec_code UNIQUE, exam_board_code, award_code, subject_code,
first_teach, spec_ver, storage_loc, doc_type CHECK(pdf|json|...),
doc_details jsonb, docling_docs jsonb)
eb_exams(exam_code UNIQUE, spec_code FK, paper_code, tier, session, type_code,
storage_loc, doc_type CHECK(pdf|json|...), doc_details jsonb, docling_docs jsonb)
IMPORTANT schema note: the QP/MS/INSERT/ER *document role* is stored in `type_code`
(the `/catalogue` endpoint filters `type_code == 'QP'`). The `doc_type` column is the
*file format* and is CHECK-constrained to {pdf,json,md,html,txt,doctags} — so it is
always 'pdf' here. (The manifest field is named `doc_type` for the role; the loader
maps manifest.doc_type -> DB.type_code and sets DB.doc_type = 'pdf'.)
Locked conventions (see ~/cc/ideas/2026-06-07-exam-paper-ingestion.md):
session = "YYYY-Mon" e.g. "2022-Jun", "2021-Nov"
exam_code = "{BOARD}-{award}-{paper_safe}-{SESSIONCOMPACT}-{ROLE}" e.g. AQA-8463-1H-2022JUN-QP
spec path = cc.examboards/{board}/{subject}/{award}/spec/{spec_ver}.pdf
paper path = cc.examboards/{board}/{subject}/{award}/{paper_safe}/{session}/{role}.pdf
Run inside the api container (env: SUPABASE_URL + SERVICE_ROLE_KEY for dev .94), e.g.:
python3 -m run.initialization.seed_exam_corpus --manifest /path/exam-corpus.yaml --dry-run
python3 -m run.initialization.seed_exam_corpus --manifest ... --board AQA
python3 -m run.initialization.seed_exam_corpus --manifest ... --first-sweep
"""
from __future__ import annotations
import argparse
import hashlib
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
import requests
import yaml # PyYAML
from modules.logger_tool import initialise_logger
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin, StorageError
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
EXAM_BUCKET = "cc.examboards"
# Manifest `doc_type` carries the document ROLE (stored in eb_exams.type_code).
DOC_ROLES = {"QP", "MS", "INSERT", "ER", "SPECIMEN", "GRADE_BOUNDARIES", "DATA_SHEET"}
TIERS = {"H", "F", None}
# Default working dir for cached url: downloads (override with --cache-dir / EXAM_CORPUS_CACHE).
DEFAULT_CACHE_DIR = os.getenv("EXAM_CORPUS_CACHE", "/tmp/exam-corpus-cache")
# Persistent, mountable local store laid out exactly like the bucket (download once, seed many,
# offline-repeatable). Override with --store-dir / EXAM_CORPUS_STORE. Distinct from --cache-dir,
# which is a throwaway url hash-cache.
DEFAULT_STORE_DIR = os.getenv(
"EXAM_CORPUS_STORE",
os.path.join(os.path.dirname(os.path.abspath(__file__)), "manifests", "_corpus_store"),
)
# ─────────────────────────────── canonical storage paths ───────────────────────────────
def _lc(s: str) -> str:
return (s or "").strip().lower().replace(" ", "-")
def _paper_safe(paper_code: str) -> str:
# Drop the award prefix, keep all remaining segments so combined-science sub-papers
# don't collide on the storage path:
# "8463/1H" -> "1h"
# "8464/B/1H" -> "b-1h" (Trilogy: subject letter + paper + tier)
# "7408/1" -> "1"
parts = _lc(paper_code).split("/")
return "-".join(parts[1:]) if len(parts) > 1 else parts[0]
def spec_storage_loc(board: str, subject: str, award: str, spec_ver: str) -> str:
# e.g. cc.examboards/aqa/physics/8463/spec/1.1.pdf
return f"{EXAM_BUCKET}/{_lc(board)}/{_lc(subject)}/{_lc(award)}/spec/{_lc(spec_ver or 'spec')}.pdf"
def paper_storage_loc(board: str, subject: str, award: str, paper_code: str, session: str, doc_role: str) -> str:
# e.g. cc.examboards/aqa/physics/8463/1h/2022-jun/qp.pdf
return f"{EXAM_BUCKET}/{_lc(board)}/{_lc(subject)}/{_lc(award)}/{_paper_safe(paper_code)}/{_lc(session)}/{_lc(doc_role)}.pdf"
# ─────────────────────────────── report ───────────────────────────────
@dataclass
class LoadReport:
specs_upserted: int = 0
papers_upserted: int = 0
files_uploaded: int = 0
files_skipped: int = 0
files_failed: int = 0
user_copies: int = 0
swept: int = 0
sweep_failed: int = 0
downloaded: int = 0
download_cached: int = 0
unseed_objects: int = 0
unseed_user_files: int = 0
unseed_exams: int = 0
unseed_specs: int = 0
unseed_templates: int = 0
errors: List[str] = field(default_factory=list)
def as_dict(self) -> Dict[str, Any]:
return {
"specs_upserted": self.specs_upserted,
"papers_upserted": self.papers_upserted,
"downloaded": self.downloaded,
"download_cached": self.download_cached,
"unseed_objects": self.unseed_objects,
"unseed_user_files": self.unseed_user_files,
"unseed_exams": self.unseed_exams,
"unseed_specs": self.unseed_specs,
"unseed_templates": self.unseed_templates,
"files_uploaded": self.files_uploaded,
"files_skipped": self.files_skipped,
"files_failed": self.files_failed,
"user_copies": self.user_copies,
"swept": self.swept,
"sweep_failed": self.sweep_failed,
"errors": self.errors,
}
# ─────────────────────────────── validation ───────────────────────────────
def validate_manifest(m: Dict[str, Any]) -> List[str]:
errs: List[str] = []
seen_specs, seen_exams = set(), set()
for board in m.get("boards", []):
bcode = board.get("exam_board_code")
if not bcode:
errs.append("board missing exam_board_code")
for spec in board.get("specifications", []):
sc = spec.get("spec_code")
if not sc or sc in seen_specs:
errs.append(f"spec_code missing/duplicate: {sc!r}")
seen_specs.add(sc)
for field_name in ("award_code", "subject_code"):
if not spec.get(field_name):
errs.append(f"{sc}: missing {field_name}")
for p in spec.get("papers", []):
ec = p.get("exam_code")
if not ec or ec in seen_exams:
errs.append(f"exam_code missing/duplicate: {ec!r}")
seen_exams.add(ec)
if p.get("doc_type") not in DOC_ROLES:
errs.append(f"{ec}: bad doc_type/role {p.get('doc_type')!r} (want one of {sorted(DOC_ROLES)})")
if p.get("tier") not in TIERS:
errs.append(f"{ec}: bad tier {p.get('tier')!r} (want H|F|null)")
if not p.get("paper_code"):
errs.append(f"{ec}: missing paper_code")
if not p.get("session"):
errs.append(f"{ec}: missing session")
src = (p.get("file") or {}).get("source")
if not src:
errs.append(f"{ec}: missing file.source")
elif not src.startswith("url:") and not os.path.exists(src):
errs.append(f"{ec}: local source not found: {src}")
return errs
# ─────────────────────────────── source resolution (local | url:, cached) ───────────────────────────────
def _resolve_source_bytes(source: str, *, cache_dir: str) -> bytes:
"""Resolve a manifest file source to bytes.
'url:https://...' -> fetch (cached to cache_dir by url hash) ; verifies non-empty.
'<local path>' -> read from disk.
"""
if source.startswith("url:"):
url = source[len("url:"):]
os.makedirs(cache_dir, exist_ok=True)
cache_key = hashlib.sha1(url.encode("utf-8")).hexdigest()
cache_path = os.path.join(cache_dir, f"{cache_key}.pdf")
if os.path.exists(cache_path) and os.path.getsize(cache_path) > 0:
with open(cache_path, "rb") as fh:
return fh.read()
logger.info(f"[fetch] {url}")
resp = requests.get(url, timeout=60, allow_redirects=True)
resp.raise_for_status()
data = resp.content
ctype = resp.headers.get("content-type", "")
if not data:
raise ValueError(f"empty download: {url}")
if "pdf" not in ctype.lower() and not data[:5].startswith(b"%PDF"):
raise ValueError(f"not a PDF (content-type={ctype!r}): {url}")
tmp = cache_path + ".part"
with open(tmp, "wb") as fh:
fh.write(data)
os.replace(tmp, cache_path)
return data
with open(source, "rb") as fh:
return fh.read()
# ─────────────────────── persistent local store (download-once, seed-many) ───────────────────────
def _store_path(store_dir: str, storage_loc: str) -> str:
"""Local path mirroring the bucket layout (so the store is directly mountable as the corpus):
storage_loc 'cc.examboards/aqa/physics/8463/1h/2022-jun/qp.pdf'
-> {store_dir}/aqa/physics/8463/1h/2022-jun/qp.pdf
"""
_, _, path = storage_loc.partition("/")
return os.path.join(store_dir, path)
def _item_bytes(source: str, storage_loc: str, *, store_dir: Optional[str], cache_dir: str,
populate: bool = True, rep: Optional[LoadReport] = None) -> bytes:
"""Resolve bytes for an item, preferring the persistent local store when present.
If store_dir holds the file → read it (offline). Otherwise resolve the source (local|url:) and,
when populate=True, write it into the store at its canonical path for future offline runs.
"""
if store_dir:
sp = _store_path(store_dir, storage_loc)
if os.path.exists(sp) and os.path.getsize(sp) > 0:
if rep is not None:
rep.download_cached += 1
with open(sp, "rb") as fh:
return fh.read()
data = _resolve_source_bytes(source, cache_dir=cache_dir)
if store_dir and populate:
sp = _store_path(store_dir, storage_loc)
os.makedirs(os.path.dirname(sp), exist_ok=True)
tmp = sp + ".part"
with open(tmp, "wb") as fh:
fh.write(data)
os.replace(tmp, sp)
if rep is not None:
rep.downloaded += 1
return data
def download_corpus(m: Dict[str, Any], *, store_dir: str, board_filter: Optional[str],
spec_filter: Optional[str], cache_dir: str, rep: LoadReport) -> None:
"""--download-only: populate the persistent local store from the manifest. No DB/bucket writes.
A later run with the same --store-dir (e.g. mounted into the container) seeds offline from it."""
for board in m.get("boards", []):
if board_filter and board.get("exam_board_code") != board_filter:
continue
for spec in board.get("specifications", []):
if spec_filter and spec.get("spec_code") != spec_filter:
continue
sf = spec.get("spec_file")
if sf and sf.get("source"):
sloc = spec_storage_loc(board["exam_board_code"], spec.get("subject_code", ""),
spec.get("award_code", ""), spec.get("spec_ver", ""))
try:
_item_bytes(sf["source"], sloc, store_dir=store_dir, cache_dir=cache_dir, rep=rep)
except Exception as exc:
rep.errors.append(f"download spec {spec.get('spec_code')}: {exc}")
for p in spec.get("papers", []):
ploc = paper_storage_loc(board["exam_board_code"], spec.get("subject_code", ""),
spec.get("award_code", ""), p["paper_code"], p["session"], p["doc_type"])
try:
_item_bytes(p["file"]["source"], ploc, store_dir=store_dir, cache_dir=cache_dir, rep=rep)
except Exception as exc:
rep.errors.append(f"download {p.get('exam_code')}: {exc}")
logger.info(f"download-only done: downloaded={rep.downloaded} already_in_store={rep.download_cached} "
f"errors={len(rep.errors)} store={store_dir}")
# ─────────────────────────────── storage upload (skip-if-exists + sha256) ───────────────────────────────
def _split_loc(storage_loc: str) -> Tuple[str, str]:
bucket, _, path = storage_loc.partition("/")
return bucket, path
def _object_exists(storage: StorageAdmin, bucket: str, path: str) -> bool:
"""Existence check by listing the object's parent folder (Supabase storage has no stat)."""
parent, _, name = path.rpartition("/")
try:
listing = storage.client.supabase.storage.from_(bucket).list(parent)
except Exception as exc:
logger.warning(f"[exists?] list failed for {bucket}/{parent}: {exc}")
return False
return any((item.get("name") == name) for item in (listing or []))
def upload_file(storage: StorageAdmin, storage_loc: str, data: bytes, *, force: bool, rep: LoadReport) -> str:
"""Upload PDF bytes to storage at storage_loc. Returns the sha256 of the bytes.
Idempotent: if the object already exists and --force was not given, skips the upload
(the catalogue upsert still runs and records the checksum). With --force, overwrites.
"""
sha = hashlib.sha256(data).hexdigest()
bucket, path = _split_loc(storage_loc)
if not force and _object_exists(storage, bucket, path):
logger.info(f"[upload] skip-exists {storage_loc} (sha256={sha[:12]})")
rep.files_skipped += 1
return sha
try:
storage.upload_file(bucket, path, data, "application/pdf", upsert=True)
logger.info(f"[upload] {storage_loc} ({len(data)} bytes, sha256={sha[:12]}) force={force}")
rep.files_uploaded += 1
except StorageError as exc:
logger.error(f"[upload] FAILED {storage_loc}: {exc}")
rep.files_failed += 1
rep.errors.append(f"upload {storage_loc}: {exc}")
return sha
# ─────────────────────────────── catalogue upserts ───────────────────────────────
def upsert_specification(client: SupabaseServiceRoleClient, spec: Dict[str, Any],
storage_loc: Optional[str], sha: Optional[str], rep: LoadReport) -> None:
sf = spec.get("spec_file") or {}
doc_details = {
"award_level": spec.get("award_level"),
"provenance": sf.get("provenance"),
"original_name": sf.get("original_name"),
"sha256": sha,
}
row = {
"spec_code": spec["spec_code"],
"exam_board_code": spec["exam_board_code"],
"award_code": spec.get("award_code"),
"subject_code": spec.get("subject_code"),
"first_teach": spec.get("first_teach"),
"spec_ver": spec.get("spec_ver"),
"storage_loc": storage_loc,
"doc_type": "pdf", # file format (CHECK-constrained); the role lives on eb_exams.type_code
"doc_details": {k: v for k, v in doc_details.items() if v is not None},
}
try:
client.supabase.table("eb_specifications").upsert(row, on_conflict="spec_code").execute()
logger.info(f"[spec] upsert {row['spec_code']}")
rep.specs_upserted += 1
except Exception as exc:
logger.error(f"[spec] FAILED {row['spec_code']}: {exc}")
rep.errors.append(f"spec {row['spec_code']}: {exc}")
def upsert_paper(client: SupabaseServiceRoleClient, spec_code: str, p: Dict[str, Any],
storage_loc: str, sha: Optional[str], rep: LoadReport) -> None:
f = p.get("file") or {}
doc_role = p["doc_type"] # manifest role: QP|MS|INSERT|ER...
doc_details = {
"doc_role": doc_role, # mirror of type_code for clarity
"original_name": f.get("original_name"),
"provenance": f.get("provenance"),
"sha256": sha,
}
row = {
"exam_code": p["exam_code"],
"spec_code": spec_code,
"paper_code": p.get("paper_code"),
"tier": p.get("tier"),
"session": p.get("session"),
"type_code": doc_role, # ROLE goes here (QP/MS/INSERT/ER)
"doc_type": "pdf", # file format (CHECK-constrained)
"storage_loc": storage_loc,
"doc_details": {k: v for k, v in doc_details.items() if v is not None},
}
try:
client.supabase.table("eb_exams").upsert(row, on_conflict="exam_code").execute()
logger.info(f"[paper] upsert {row['exam_code']} type_code={doc_role}")
rep.papers_upserted += 1
except Exception as exc:
logger.error(f"[paper] FAILED {row['exam_code']}: {exc}")
rep.errors.append(f"paper {row['exam_code']}: {exc}")
# ─────────────────────────────── user-side test subset ───────────────────────────────
def _resolve_test_user(client: SupabaseServiceRoleClient, cfg: Dict[str, Any]) -> Optional[Tuple[str, str]]:
"""Resolve (user_id, institute_id) for the user-side subset from config, with discovery fallback."""
user_id = cfg.get("user_id")
if not user_id and cfg.get("user_email"):
res = client.supabase.table("profiles").select("id").eq("email", cfg["user_email"]).limit(1).execute()
rows = getattr(res, "data", None) or []
user_id = rows[0]["id"] if rows else None
if not user_id:
logger.warning("[user-subset] no test user resolvable (set test_subset.user_id or user_email); skipping")
return None
institute_id = cfg.get("institute_id")
if not institute_id:
res = client.supabase.table("institute_memberships").select("institute_id").eq("profile_id", user_id).limit(1).execute()
rows = getattr(res, "data", None) or []
institute_id = rows[0]["institute_id"] if rows else None
if not institute_id:
logger.warning(f"[user-subset] no institute for user {user_id}; skipping")
return None
return user_id, institute_id
def copy_user_test_subset(client: SupabaseServiceRoleClient, storage: StorageAdmin,
m: Dict[str, Any], rep: LoadReport) -> None:
"""Copy a small subset of admin papers into a test user's exam space so user-side flows
(upload-as-exam / promote-from-cabinet / mark) are testable.
Driven by an optional manifest `test_subset:` block:
test_subset:
user_id: <uuid> # or user_email: <email>
institute_id: <uuid> # optional; discovered from membership if omitted
papers: 2 # how many QP papers to copy (default 2)
Degrades gracefully (logs + skips) if no test user is resolvable on this env.
"""
cfg = m.get("test_subset") or {}
resolved = _resolve_test_user(client, cfg)
if not resolved:
return
user_id, institute_id = resolved
limit = int(cfg.get("papers", 2))
# Gather candidate QP papers (admin corpus already uploaded to cc.examboards).
candidates: List[Tuple[str, Dict[str, Any]]] = []
for board in m.get("boards", []):
for spec in board.get("specifications", []):
for p in spec.get("papers", []):
if p.get("doc_type") == "QP":
candidates.append((board["exam_board_code"], spec, p))
candidates = candidates[:limit]
if not candidates:
logger.info("[user-subset] no QP papers to copy")
return
# Ensure a cabinet for the user.
cab_name = "Exam Marker Template Sources"
res = client.supabase.table("file_cabinets").select("id").eq("user_id", user_id).eq("name", cab_name).limit(1).execute()
rows = getattr(res, "data", None) or []
if rows:
cabinet_id = rows[0]["id"]
else:
ins = client.supabase.table("file_cabinets").insert({"user_id": user_id, "name": cab_name}).execute()
cabinet_id = (getattr(ins, "data", None) or [{}])[0].get("id")
if not cabinet_id:
logger.warning("[user-subset] could not ensure cabinet; skipping")
return
import uuid as _uuid
for board_code, spec, p in candidates:
src_loc = paper_storage_loc(board_code, spec.get("subject_code", ""), spec.get("award_code", ""),
p["paper_code"], p["session"], p["doc_type"])
sbucket, spath = _split_loc(src_loc)
try:
data = storage.download_file(sbucket, spath)
except Exception as exc:
logger.warning(f"[user-subset] source missing {src_loc}: {exc}; skipping {p['exam_code']}")
continue
file_id = str(_uuid.uuid4())
safe_name = f"{p['exam_code']}.pdf"
dst_bucket = "cc.users"
dst_path = f"exam-marker/{institute_id}/{cabinet_id}/{file_id}/{safe_name}"
try:
storage.upload_file(dst_bucket, dst_path, data, "application/pdf", upsert=True)
except Exception as exc:
logger.warning(f"[user-subset] copy upload failed {dst_path}: {exc}")
continue
client.supabase.table("files").upsert({
"id": file_id, "cabinet_id": cabinet_id, "name": safe_name, "path": dst_path,
"bucket": dst_bucket, "mime_type": "application/pdf", "uploaded_by": user_id,
"size_bytes": len(data), "source": "exam-corpus-seed", "is_directory": False,
"relative_path": safe_name, "processing_status": "uploaded",
}).execute()
logger.info(f"[user-subset] copied {p['exam_code']} -> {dst_bucket}/{dst_path}")
rep.user_copies += 1
# ─────────────────────────────── first sweep (docling auto-map) ───────────────────────────────
def _resolve_system_identity(client: SupabaseServiceRoleClient, m: Dict[str, Any]) -> Optional[Tuple[str, str]]:
cfg = m.get("system_identity") or m.get("test_subset") or {}
user_id = cfg.get("teacher_id") or cfg.get("user_id")
if not user_id and cfg.get("user_email"):
res = client.supabase.table("profiles").select("id").eq("email", cfg["user_email"]).limit(1).execute()
rows = getattr(res, "data", None) or []
user_id = rows[0]["id"] if rows else None
institute_id = cfg.get("institute_id")
if user_id and not institute_id:
res = client.supabase.table("institute_memberships").select("institute_id").eq("profile_id", user_id).limit(1).execute()
rows = getattr(res, "data", None) or []
institute_id = rows[0]["institute_id"] if rows else None
if not user_id or not institute_id:
logger.warning("[first-sweep] no system identity (set system_identity.teacher_id+institute_id); skipping sweep")
return None
return user_id, institute_id
def first_sweep(client: SupabaseServiceRoleClient, storage: StorageAdmin,
m: Dict[str, Any], board_filter: Optional[str], spec_filter: Optional[str],
cache_dir: str, rep: LoadReport) -> None:
"""Run the docling/auto_map first pass over seeded QP papers and persist the resulting
template structure (questions/response areas/boundaries/layout) via the same mapping the
/auto-map endpoint uses. System-owned exam_templates are created per QP paper.
Requires a resolvable `system_identity` (teacher_id/user_email + institute_id) on this env.
"""
identity = _resolve_system_identity(client, m)
if not identity:
return
teacher_id, institute_id = identity
# Import the auto-map mapping helpers lazily (pulls fastapi/router only when sweeping).
try:
from api.services.docling import auto_map, AutoMapError
from routers.exam.templates import _map_first_pass_to_rows
except Exception as exc:
logger.error(f"[first-sweep] could not import auto-map pipeline: {exc}")
rep.errors.append(f"first-sweep import: {exc}")
return
sb = client.supabase
for board in m.get("boards", []):
if board_filter and board.get("exam_board_code") != board_filter:
continue
for spec in board.get("specifications", []):
if spec_filter and spec.get("spec_code") != spec_filter:
continue
for p in spec.get("papers", []):
if p.get("doc_type") != "QP":
continue
# Resolve the seeded eb_exams row (id) for the template join.
ex = sb.table("eb_exams").select("id, exam_code").eq("exam_code", p["exam_code"]).limit(1).execute()
ex_rows = getattr(ex, "data", None) or []
exam_id = ex_rows[0]["id"] if ex_rows else None
loc = paper_storage_loc(board["exam_board_code"], spec.get("subject_code", ""),
spec.get("award_code", ""), p["paper_code"], p["session"], p["doc_type"])
bkt, path = _split_loc(loc)
try:
pdf_bytes = storage.download_file(bkt, path)
except Exception as exc:
logger.warning(f"[first-sweep] source missing {loc}: {exc}; skipping {p['exam_code']}")
continue
# Ensure a system-owned template for this paper (idempotent on exam_code+teacher).
tpl = sb.table("exam_templates").select("id").eq("exam_code", p["exam_code"]).eq("teacher_id", teacher_id).limit(1).execute()
tpl_rows = getattr(tpl, "data", None) or []
if tpl_rows:
template_id = tpl_rows[0]["id"]
else:
new_tpl = sb.table("exam_templates").insert({
"exam_id": exam_id, "exam_code": p["exam_code"], "institute_id": institute_id,
"teacher_id": teacher_id, "title": f"{p['exam_code']} (auto-map seed)",
"subject": spec.get("subject_code"), "status": "draft",
}).execute()
template_id = (getattr(new_tpl, "data", None) or [{}])[0].get("id")
if not template_id:
logger.warning(f"[first-sweep] could not ensure template for {p['exam_code']}; skipping")
continue
try:
first_pass = auto_map(pdf_bytes, source_pdf=loc)
rows = _map_first_pass_to_rows(template_id, first_pass, pdf_bytes)
except (AutoMapError, ValueError) as exc:
logger.warning(f"[first-sweep] auto-map failed for {p['exam_code']}: {exc}")
rep.sweep_failed += 1
continue
except Exception as exc:
logger.exception(f"[first-sweep] unexpected error for {p['exam_code']}: {exc}")
rep.sweep_failed += 1
continue
# Refresh derived rows. Seed templates are system-owned with no human edits to
# preserve, so we clear ALL child rows for the template (not just ai/unconfirmed)
# and re-insert id-deduped payloads — idempotent across re-runs and robust to the
# deterministic uuid5 ids the mapper can repeat within a batch.
for table in ("exam_response_areas", "exam_boundaries", "exam_template_layout", "exam_questions"):
sb.table(table).delete().eq("template_id", template_id).execute()
for table, key in (("exam_questions", "questions"), ("exam_response_areas", "response_areas"),
("exam_boundaries", "boundaries"), ("exam_template_layout", "layout")):
seen_ids: set = set()
payload = []
for r in (rows.get(key) or []):
rid = r.get("id")
if rid is not None and rid in seen_ids:
continue
if rid is not None:
seen_ids.add(rid)
payload.append(r)
if payload:
sb.table(table).insert(payload).execute()
updates = {"page_count": first_pass.get("meta", {}).get("n_pages")}
sb.table("exam_templates").update({k: v for k, v in updates.items() if v is not None}).eq("id", template_id).execute()
logger.info(f"[first-sweep] swept {p['exam_code']} -> template {template_id} "
f"(q={len(rows.get('questions', []))} ra={len(rows.get('response_areas', []))})")
rep.swept += 1
# ─────────────────────────────── unseed (inverse of the loader) ───────────────────────────────
def _chunks(seq: List[Any], n: int = 100):
for i in range(0, len(seq), n):
yield seq[i:i + n]
def _storage_remove(storage: StorageAdmin, bucket: str, paths: List[str]) -> None:
"""Remove object paths from a bucket through the Supabase Storage API.
The python client treats missing objects as a successful no-op, which is useful for
unseed idempotency. Any API/permission failure is raised so callers can avoid
deleting the matching DB rows while storage may still exist.
"""
result = storage.client.supabase.storage.from_(bucket).remove(paths)
error = getattr(result, "error", None)
if error:
raise StorageError(str(error))
if isinstance(result, dict) and result.get("error"):
raise StorageError(str(result["error"]))
def _delete_user_subset_files(client: SupabaseServiceRoleClient, storage: StorageAdmin, *,
exam_codes: Optional[List[str]], rep: LoadReport) -> None:
"""Delete --user-subset files from cc.users storage, then their files rows.
User-subset seeding writes rows with source='exam-corpus-seed', bucket='cc.users',
and paths under exam-marker/. Storage must be removed before the files rows: the
files GC trigger also tries to delete storage when rows are deleted, so removing
objects first avoids trigger failures and keeps this operation idempotent.
exam_codes=None means remove all user-subset seed rows (used by unscoped unseed
even if the eb_* rows were already removed by a prior partial run).
"""
sb = client.supabase
seeded_files: List[Dict[str, Any]] = []
def _base_query():
return sb.table("files").select("id, bucket, path, name, source") \
.eq("bucket", "cc.users").eq("source", "exam-corpus-seed") \
.like("path", "exam-marker/%")
if exam_codes is None:
seeded_files.extend(getattr(_base_query().execute(), "data", None) or [])
elif exam_codes:
for chunk in _chunks([f"{code}.pdf" for code in exam_codes if code], 100):
seeded_files.extend(getattr(_base_query().in_("name", chunk).execute(), "data", None) or [])
rows_by_id: Dict[str, Dict[str, Any]] = {}
paths_by_bucket: Dict[str, List[str]] = {}
seen_paths: set = set()
for row in seeded_files:
row_id = row.get("id")
bucket = row.get("bucket")
path = row.get("path")
if row_id:
rows_by_id[str(row_id)] = row
if bucket == "cc.users" and isinstance(path, str) and path.startswith("exam-marker/"):
key = (bucket, path)
if key not in seen_paths:
seen_paths.add(key)
paths_by_bucket.setdefault(bucket, []).append(path)
removable_ids = list(rows_by_id)
if not removable_ids and not paths_by_bucket:
logger.info("[unseed] no user-subset cc.users files to remove")
return
for bkt, paths in paths_by_bucket.items():
for chunk in _chunks(paths, 100):
try:
_storage_remove(storage, bkt, chunk)
rep.unseed_objects += len(chunk)
except Exception as exc:
logger.warning(f"[unseed] user-subset storage remove failed ({bkt}, {len(chunk)} objs): {exc}")
rep.errors.append(f"user-subset storage remove {bkt}: {exc}")
return
for chunk in _chunks(removable_ids, 100):
try:
sb.table("files").delete().in_("id", chunk).execute()
rep.unseed_user_files += len(chunk)
except Exception as exc:
logger.warning(f"[unseed] user-subset files delete failed: {exc}")
rep.errors.append(f"user-subset files delete: {exc}")
def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *,
board_filter: Optional[str], spec_filter: Optional[str],
drop_specs: bool = True, drop_seed_templates: bool = True, rep: LoadReport) -> None:
"""Inverse of the loader: remove the seeded public corpus, scoped by --board/--spec (or all).
Deletes (in FK-safe order): cc.examboards storage objects (via the Storage API, since the
protect_delete trigger blocks direct SQL deletes), first-sweep exam_templates created by the
seed (title '... (auto-map seed)', cascades children), eb_exams rows, then eb_specifications.
"""
sb = client.supabase
q = sb.table("eb_specifications").select("spec_code, storage_loc, exam_board_code")
if board_filter:
q = q.eq("exam_board_code", board_filter)
if spec_filter:
q = q.eq("spec_code", spec_filter)
specs = getattr(q.execute(), "data", None) or []
spec_codes = [s["spec_code"] for s in specs]
if not spec_codes:
if not board_filter and not spec_filter:
_delete_user_subset_files(client, storage, exam_codes=None, rep=rep)
logger.info("[unseed] no matching specifications; nothing to do")
return
exams: List[Dict[str, Any]] = []
for chunk in _chunks(spec_codes):
res = sb.table("eb_exams").select("id, exam_code, storage_loc").in_("spec_code", chunk).execute()
exams.extend(getattr(res, "data", None) or [])
# 1) User-subset storage/rows. Storage is removed before files rows so trg_files_gc has
# nothing left to collect when rows are deleted.
user_subset_exam_codes = None if not board_filter and not spec_filter else [
e.get("exam_code") for e in exams if e.get("exam_code")
]
_delete_user_subset_files(client, storage, exam_codes=user_subset_exam_codes, rep=rep)
# 2) Storage objects (Storage API; batch-remove per bucket). Specs may carry a spec PDF too.
by_bucket: Dict[str, List[str]] = {}
for row in exams + specs:
loc = row.get("storage_loc")
if not loc or "/" not in loc:
continue
bkt, _, path = loc.partition("/")
by_bucket.setdefault(bkt, []).append(path)
for bkt, paths in by_bucket.items():
for chunk in _chunks(paths, 100):
try:
storage.client.supabase.storage.from_(bkt).remove(chunk)
rep.unseed_objects += len(chunk)
except Exception as exc:
logger.warning(f"[unseed] storage remove failed ({bkt}, {len(chunk)} objs): {exc}")
# 3) First-sweep templates created by the seed (cascades questions/regions/boundaries/layout).
if drop_seed_templates and exams:
exam_codes = [e["exam_code"] for e in exams if e.get("exam_code")]
for chunk in _chunks(exam_codes, 100):
try:
res = sb.table("exam_templates").delete(count="exact") \
.in_("exam_code", chunk).like("title", "%(auto-map seed)%").execute()
rep.unseed_templates += getattr(res, "count", None) or len(getattr(res, "data", []) or [])
except Exception as exc:
logger.warning(f"[unseed] template delete failed: {exc}")
# 4) Catalogue rows: eb_exams (by id), then eb_specifications (by spec_code).
exam_ids = [e["id"] for e in exams]
for chunk in _chunks(exam_ids, 100):
try:
sb.table("eb_exams").delete().in_("id", chunk).execute()
rep.unseed_exams += len(chunk)
except Exception as exc:
logger.warning(f"[unseed] eb_exams delete failed: {exc}")
if drop_specs:
for chunk in _chunks(spec_codes, 100):
try:
sb.table("eb_specifications").delete().in_("spec_code", chunk).execute()
rep.unseed_specs += len(chunk)
except Exception as exc:
logger.warning(f"[unseed] eb_specifications delete failed: {exc}")
logger.info(f"unseed done: storage_objects={rep.unseed_objects} user_files={rep.unseed_user_files} "
f"templates={rep.unseed_templates} exams={rep.unseed_exams} specs={rep.unseed_specs}")
# ─────────────────────────────── orchestration ───────────────────────────────
def load(manifest_path: str, *, dry_run: bool, force: bool, board_filter: Optional[str],
spec_filter: Optional[str], user_subset: bool, do_first_sweep: bool,
cache_dir: str = DEFAULT_CACHE_DIR, store_dir: Optional[str] = None) -> LoadReport:
with open(manifest_path) as f:
m = yaml.safe_load(f)
rep = LoadReport()
errs = validate_manifest(m)
if errs:
rep.errors = list(errs)
logger.error(f"manifest validation failed: {len(errs)} error(s)")
for e in errs[:40]:
logger.error(f" - {e}")
if not dry_run:
return rep
client = None if dry_run else SupabaseServiceRoleClient()
storage = None if dry_run else StorageAdmin()
for board in m.get("boards", []):
if board_filter and board.get("exam_board_code") != board_filter:
continue
for spec in board.get("specifications", []):
if spec_filter and spec.get("spec_code") != spec_filter:
continue
# Specification document (optional).
sloc = None
spec_sha = None
sf = spec.get("spec_file")
if sf and sf.get("source"):
sloc = spec_storage_loc(board["exam_board_code"], spec.get("subject_code", ""),
spec.get("award_code", ""), spec.get("spec_ver", ""))
if not dry_run:
try:
spec_sha = upload_file(storage, sloc,
_item_bytes(sf["source"], sloc, store_dir=store_dir,
cache_dir=cache_dir, rep=rep),
force=force, rep=rep)
except Exception as exc:
logger.error(f"[spec-file] {spec.get('spec_code')}: {exc}")
rep.files_failed += 1
rep.errors.append(f"spec-file {spec.get('spec_code')}: {exc}")
if not dry_run:
upsert_specification(client, spec, sloc, spec_sha, rep)
# Papers.
for p in spec.get("papers", []):
ploc = paper_storage_loc(board["exam_board_code"], spec.get("subject_code", ""),
spec.get("award_code", ""), p["paper_code"], p["session"], p["doc_type"])
if dry_run:
continue
psha = None
try:
psha = upload_file(storage, ploc,
_item_bytes(p["file"]["source"], ploc, store_dir=store_dir,
cache_dir=cache_dir, rep=rep),
force=force, rep=rep)
except Exception as exc:
logger.error(f"[paper-file] {p.get('exam_code')}: {exc}")
rep.files_failed += 1
rep.errors.append(f"paper-file {p.get('exam_code')}: {exc}")
upsert_paper(client, spec["spec_code"], p, ploc, psha, rep)
if user_subset and not dry_run:
copy_user_test_subset(client, storage, m, rep)
if do_first_sweep and not dry_run:
first_sweep(client, storage, m, board_filter, spec_filter, cache_dir, rep)
logger.info(f"corpus load done: specs={rep.specs_upserted} papers={rep.papers_upserted} "
f"uploaded={rep.files_uploaded} skipped={rep.files_skipped} failed={rep.files_failed} "
f"user_copies={rep.user_copies} swept={rep.swept} errors={len(rep.errors)}")
return rep
def main() -> None:
ap = argparse.ArgumentParser(description="Seed (or unseed) the public exam-paper corpus from a manifest.")
ap.add_argument("--manifest", help="corpus manifest (required except for --unseed)")
ap.add_argument("--dry-run", action="store_true", help="validate + report, no writes")
ap.add_argument("--force", action="store_true", help="re-upload/overwrite existing storage objects")
ap.add_argument("--board", default=None, help="only this exam_board_code")
ap.add_argument("--spec", default=None, help="only this spec_code")
ap.add_argument("--user-subset", action="store_true", help="also seed a user-side test subset")
ap.add_argument("--first-sweep", action="store_true", help="run docling/auto-map first pass on seeded papers")
ap.add_argument("--cache-dir", default=DEFAULT_CACHE_DIR, help="throwaway url-hash cache dir")
ap.add_argument("--store-dir", default=DEFAULT_STORE_DIR,
help="persistent, bucket-shaped local store (download-once, seed-many)")
ap.add_argument("--no-store", action="store_true",
help="ignore the local store; always fetch from source (don't read/populate the store)")
ap.add_argument("--download-only", action="store_true",
help="populate the local store from the manifest; no DB/bucket writes")
ap.add_argument("--unseed", action="store_true",
help="INVERSE: remove seeded eb_*/storage/first-sweep templates (scoped by --board/--spec)")
a = ap.parse_args()
store_dir = None if a.no_store else a.store_dir
import json
if a.unseed:
rep = LoadReport()
unseed(SupabaseServiceRoleClient(), StorageAdmin(),
board_filter=a.board, spec_filter=a.spec, rep=rep)
print(json.dumps(rep.as_dict(), indent=2))
return
if not a.manifest:
ap.error("--manifest is required unless --unseed is given")
if a.download_only:
with open(a.manifest) as f:
m = yaml.safe_load(f)
rep = LoadReport()
download_corpus(m, store_dir=(a.store_dir), board_filter=a.board, spec_filter=a.spec,
cache_dir=a.cache_dir, rep=rep)
print(json.dumps(rep.as_dict(), indent=2))
return
rep = load(a.manifest, dry_run=a.dry_run, force=a.force, board_filter=a.board, spec_filter=a.spec,
user_subset=a.user_subset, do_first_sweep=a.first_sweep, cache_dir=a.cache_dir,
store_dir=store_dir)
print(json.dumps(rep.as_dict(), indent=2))
if __name__ == "__main__":
main()