From 9aabc1206223f576e312935618e5c2add008f0fb Mon Sep 17 00:00:00 2001 From: CC Worker Date: Sun, 7 Jun 2026 22:22:48 +0000 Subject: [PATCH] feat(seed): provision taxonomy buckets (infra) + exam-corpus loader skeleton MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit infra (buckets.py): add cc.public / cc.institutes / cc.admin to the bucket provisioner alongside cc.examboards; make initialize_buckets idempotent (already-exists treated as success). Bucket provisioning stays in infra init. new (seed_exam_corpus.py): manifest-driven loader scaffold that USES the buckets (does not create them) — validate -> upload to cc.examboards (canonical path) -> upsert eb_specifications/eb_exams -> optional user test subset -> optional --first-sweep auto-map pass. TODOs marked for the gathering task to complete. Co-Authored-By: Claude Opus 4.8 --- run/initialization/buckets.py | 46 +++++- run/initialization/seed_exam_corpus.py | 208 +++++++++++++++++++++++++ 2 files changed, 248 insertions(+), 6 deletions(-) create mode 100644 run/initialization/seed_exam_corpus.py diff --git a/run/initialization/buckets.py b/run/initialization/buckets.py index 351df04..105d091 100644 --- a/run/initialization/buckets.py +++ b/run/initialization/buckets.py @@ -46,7 +46,7 @@ def initialize_buckets() -> dict: file_size_limit=1000 * 1024 * 1024, # 1GB ) }, - # Exam Board files + # Exam Board files (admin-curated public exam corpus: QP/MS/insert/ER + specs) { "id": "cc.examboards", "options": CreateBucketOptions( @@ -55,6 +55,34 @@ def initialize_buckets() -> dict: file_size_limit=1000 * 1024 * 1024, # 1GB ) }, + # ── Storage taxonomy bins (access scoped by RLS on bucket + leading path segment; RLS = D1) ── + # Platform-managed public/shared assets (readable by all authenticated users). + { + "id": "cc.public", + "options": CreateBucketOptions( + name="Classroom Copilot Public", + public=False, + file_size_limit=1000 * 1024 * 1024, # 1GB + ) + }, + # Institute-scoped operational assets: cc.institutes/{institute_id}/... + { + "id": "cc.institutes", + "options": CreateBucketOptions( + name="Classroom Copilot Institutes", + public=False, + file_size_limit=1000 * 1024 * 1024, # 1GB + ) + }, + # Platform-admin-only assets, seeds, intake/staging for unidentified papers. + { + "id": "cc.admin", + "options": CreateBucketOptions( + name="Classroom Copilot Admin", + public=False, + file_size_limit=1000 * 1024 * 1024, # 1GB + ) + }, ] results = {} @@ -81,11 +109,17 @@ def initialize_buckets() -> dict: logger.error(f"Failed to create bucket: {bucket['id']}") except Exception as e: - results[bucket["id"]] = { - "status": "error", - "error": str(e) - } - logger.error(f"Error creating bucket {bucket['id']}: {str(e)}") + # Idempotent: an already-existing bucket is not a failure on re-run. + if any(s in str(e).lower() for s in ("already exists", "duplicate", "resource already")): + results[bucket["id"]] = {"status": "exists", "result": str(e)} + success_count += 1 + logger.info(f"Bucket already exists (ok): {bucket['id']}") + else: + results[bucket["id"]] = { + "status": "error", + "error": str(e) + } + logger.error(f"Error creating bucket {bucket['id']}: {str(e)}") # Determine overall success if success_count == total_count: diff --git a/run/initialization/seed_exam_corpus.py b/run/initialization/seed_exam_corpus.py new file mode 100644 index 0000000..084f905 --- /dev/null +++ b/run/initialization/seed_exam_corpus.py @@ -0,0 +1,208 @@ +""" +seed_exam_corpus.py — manifest-driven loader for the public exam-paper corpus. + +SCOPE (separate from infra): assumes storage buckets already exist (provisioned by +run/initialization/buckets.py during infra init). This loader UPLOADS papers and +SEEDS the catalogue; it does NOT create buckets. + +Pipeline per manifest item: + validate -> upload file to cc.examboards (canonical path) -> upsert eb_specifications + / eb_exams (catalogue) -> (optional) copy a subset into a test user's exam space + -> (optional, --first-sweep) run the docling/auto-map first pass to gather structure. + +Manifest template: ~/cc/specs/exam-corpus-manifest.example.yaml +Catalogue columns (real): + eb_specifications(spec_code, exam_board_code, award_code, subject_code, first_teach, + spec_ver, storage_loc, doc_type, doc_details jsonb) + eb_exams(exam_code, spec_code, paper_code, tier, session, type_code, storage_loc, + doc_type, doc_details jsonb) + +Run inside the api container, e.g.: + python3 -m run.initialization.seed_exam_corpus --manifest /path/exam-corpus.yaml --dry-run + python3 -m run.initialization.seed_exam_corpus --manifest ... --board AQA --first-sweep +""" +from __future__ import annotations +import argparse +import hashlib +import os +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +import yaml # PyYAML + +from modules.logger_tool import initialise_logger +from modules.database.supabase.utils.client import SupabaseServiceRoleClient + +logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) + +EXAM_BUCKET = "cc.examboards" +DOC_TYPES = {"QP", "MS", "INSERT", "ER", "SPECIMEN", "GRADE_BOUNDARIES", "DATA_SHEET"} +TIERS = {"H", "F", None} + + +# ─────────────────────────────── canonical storage paths ─────────────────────────────── +def _lc(s: str) -> str: + return (s or "").strip().lower().replace(" ", "-") + +def spec_storage_loc(board: str, subject: str, award: str, spec_ver: str) -> str: + # e.g. cc.examboards/aqa/physics/8463/spec/1.1.pdf + return f"{EXAM_BUCKET}/{_lc(board)}/{_lc(subject)}/{_lc(award)}/spec/{_lc(spec_ver or 'spec')}.pdf" + +def paper_storage_loc(board: str, subject: str, award: str, paper_code: str, session: str, doc_type: str) -> str: + # e.g. cc.examboards/aqa/physics/8463/1h/2022-jun/qp.pdf + paper_safe = _lc(paper_code).split("/")[-1] + return f"{EXAM_BUCKET}/{_lc(board)}/{_lc(subject)}/{_lc(award)}/{paper_safe}/{_lc(session)}/{_lc(doc_type)}.pdf" + + +# ─────────────────────────────── validation ─────────────────────────────── +@dataclass +class LoadReport: + specs_upserted: int = 0 + papers_upserted: int = 0 + files_uploaded: int = 0 + files_skipped: int = 0 + user_copies: int = 0 + swept: int = 0 + errors: List[str] = field(default_factory=list) + +def validate_manifest(m: Dict[str, Any]) -> List[str]: + errs: List[str] = [] + seen_specs, seen_exams = set(), set() + for board in m.get("boards", []): + for spec in board.get("specifications", []): + sc = spec.get("spec_code") + if not sc or sc in seen_specs: + errs.append(f"spec_code missing/duplicate: {sc!r}") + seen_specs.add(sc) + for p in spec.get("papers", []): + ec = p.get("exam_code") + if not ec or ec in seen_exams: + errs.append(f"exam_code missing/duplicate: {ec!r}") + seen_exams.add(ec) + if p.get("doc_type") not in DOC_TYPES: + errs.append(f"{ec}: bad doc_type {p.get('doc_type')!r}") + if p.get("tier") not in TIERS: + errs.append(f"{ec}: bad tier {p.get('tier')!r}") + # TODO(agent): resolve p['file']['source'] (local path or url:) and confirm it exists. + return errs + + +# ─────────────────────────────── loader steps (TODOs for the gathering agent) ─────────────────────────────── +def _resolve_source_bytes(source: str) -> bytes: + """Local path or 'url:https://...'. TODO(agent): implement url fetch + caching.""" + if source.startswith("url:"): + raise NotImplementedError("url: sources — implement fetch in the gathering task") + with open(source, "rb") as f: + return f.read() + +def upload_file(client: SupabaseServiceRoleClient, storage_loc: str, data: bytes, *, force: bool, rep: LoadReport) -> None: + bucket, _, path = storage_loc.partition("/") + # TODO(agent): skip-if-exists + checksum (sha256) unless force; then client.upload_file(bucket, path, data). + _ = hashlib.sha256(data).hexdigest() + logger.info(f"[upload] {storage_loc} ({len(data)} bytes) force={force}") + rep.files_uploaded += 1 + +def upsert_specification(spec: Dict[str, Any], storage_loc: Optional[str], rep: LoadReport) -> None: + row = { + "spec_code": spec["spec_code"], "exam_board_code": spec["exam_board_code"], + "award_code": spec.get("award_code"), "subject_code": spec.get("subject_code"), + "first_teach": spec.get("first_teach"), "spec_ver": spec.get("spec_ver"), + "storage_loc": storage_loc, "doc_type": "pdf", + "doc_details": {"award_level": spec.get("award_level"), + "provenance": spec.get("spec_file", {}).get("provenance")}, + } + # TODO(agent): upsert into eb_specifications on conflict (spec_code). + logger.info(f"[spec] upsert {row['spec_code']}") + rep.specs_upserted += 1 + +def upsert_paper(spec_code: str, p: Dict[str, Any], storage_loc: str, rep: LoadReport) -> None: + row = { + "exam_code": p["exam_code"], "spec_code": spec_code, "paper_code": p.get("paper_code"), + "tier": p.get("tier"), "session": p.get("session"), "type_code": p.get("type_code"), + "doc_type": p["doc_type"], "storage_loc": storage_loc, + "doc_details": {"original_name": p.get("file", {}).get("original_name"), + "provenance": p.get("file", {}).get("provenance")}, + } + # TODO(agent): upsert into eb_exams on conflict (exam_code). + logger.info(f"[paper] upsert {row['exam_code']} doc_type={row['doc_type']}") + rep.papers_upserted += 1 + +def copy_user_test_subset(m: Dict[str, Any], rep: LoadReport) -> None: + """TODO(agent): copy a small subset of admin papers into a test user's exam space + (cc.users/{user_id}/exam-marker/... or cc.institutes/...) + create user exam_templates rows, + so user-side flows (upload-as-exam / promote-from-cabinet / mark) are testable.""" + logger.info("[user-subset] TODO: seed user test papers from admin subset") + +def first_sweep(m: Dict[str, Any], rep: LoadReport) -> None: + """TODO(agent): run the docling/auto_map first pass over seeded papers to populate + exam_templates/questions/regions/layout structure as part of seeding (calls + api.services.docling.auto_map + the /auto-map upsert mapper path).""" + logger.info("[first-sweep] TODO: run auto-map first pass on seeded papers") + + +# ─────────────────────────────── orchestration ─────────────────────────────── +def load(manifest_path: str, *, dry_run: bool, force: bool, board_filter: Optional[str], + spec_filter: Optional[str], user_subset: bool, do_first_sweep: bool) -> LoadReport: + with open(manifest_path) as f: + m = yaml.safe_load(f) + rep = LoadReport() + + errs = validate_manifest(m) + if errs: + rep.errors = errs + logger.error(f"manifest validation failed: {len(errs)} error(s)") + for e in errs[:20]: + logger.error(f" - {e}") + if not dry_run: + return rep + + client = None if dry_run else SupabaseServiceRoleClient() + + for board in m.get("boards", []): + if board_filter and board.get("exam_board_code") != board_filter: + continue + for spec in board.get("specifications", []): + if spec_filter and spec.get("spec_code") != spec_filter: + continue + sloc = None + sf = spec.get("spec_file") + if sf: + sloc = spec_storage_loc(board["exam_board_code"], spec.get("subject_code", ""), + spec.get("award_code", ""), spec.get("spec_ver", "")) + if not dry_run: + upload_file(client, sloc, _resolve_source_bytes(sf["source"]), force=force, rep=rep) + if not dry_run: + upsert_specification(spec, sloc, rep) + for p in spec.get("papers", []): + ploc = paper_storage_loc(board["exam_board_code"], spec.get("subject_code", ""), + spec.get("award_code", ""), p["paper_code"], p["session"], p["doc_type"]) + if not dry_run: + upload_file(client, ploc, _resolve_source_bytes(p["file"]["source"]), force=force, rep=rep) + upsert_paper(spec["spec_code"], p, ploc, rep) + + if user_subset and not dry_run: + copy_user_test_subset(m, rep) + if do_first_sweep and not dry_run: + first_sweep(m, rep) + + logger.info(f"corpus load done: specs={rep.specs_upserted} papers={rep.papers_upserted} " + f"uploaded={rep.files_uploaded} skipped={rep.files_skipped} errors={len(rep.errors)}") + return rep + + +def main() -> None: + ap = argparse.ArgumentParser(description="Seed the public exam-paper corpus from a manifest.") + ap.add_argument("--manifest", required=True) + ap.add_argument("--dry-run", action="store_true", help="validate + report, no writes") + ap.add_argument("--force", action="store_true", help="re-upload/overwrite existing storage objects") + ap.add_argument("--board", default=None, help="only this exam_board_code") + ap.add_argument("--spec", default=None, help="only this spec_code") + ap.add_argument("--user-subset", action="store_true", help="also seed a user-side test subset") + ap.add_argument("--first-sweep", action="store_true", help="run docling/auto-map first pass on seeded papers") + a = ap.parse_args() + load(a.manifest, dry_run=a.dry_run, force=a.force, board_filter=a.board, spec_filter=a.spec, + user_subset=a.user_subset, do_first_sweep=a.first_sweep) + + +if __name__ == "__main__": + main()