From 7819e6e346ad8c31a8ffd552c0c174ab3354b757 Mon Sep 17 00:00:00 2001 From: kcar Date: Mon, 8 Jun 2026 01:10:55 +0100 Subject: [PATCH] fix(seed): unseed user-subset storage objects (cherry picked from commit 9328ec2e062d039c0bcfabb086ce0693fe1ebe50) --- run/initialization/seed_exam_corpus.py | 99 +++++++++++++- tests/test_seed_exam_corpus_unseed.py | 171 +++++++++++++++++++++++++ 2 files changed, 265 insertions(+), 5 deletions(-) create mode 100644 tests/test_seed_exam_corpus_unseed.py diff --git a/run/initialization/seed_exam_corpus.py b/run/initialization/seed_exam_corpus.py index 1190131..aa3cbcc 100644 --- a/run/initialization/seed_exam_corpus.py +++ b/run/initialization/seed_exam_corpus.py @@ -105,6 +105,7 @@ class LoadReport: downloaded: int = 0 download_cached: int = 0 unseed_objects: int = 0 + unseed_user_files: int = 0 unseed_exams: int = 0 unseed_specs: int = 0 unseed_templates: int = 0 @@ -117,6 +118,7 @@ class LoadReport: "downloaded": self.downloaded, "download_cached": self.download_cached, "unseed_objects": self.unseed_objects, + "unseed_user_files": self.unseed_user_files, "unseed_exams": self.unseed_exams, "unseed_specs": self.unseed_specs, "unseed_templates": self.unseed_templates, @@ -579,6 +581,84 @@ def _chunks(seq: List[Any], n: int = 100): for i in range(0, len(seq), n): yield seq[i:i + n] +def _storage_remove(storage: StorageAdmin, bucket: str, paths: List[str]) -> None: + """Remove object paths from a bucket through the Supabase Storage API. + + The python client treats missing objects as a successful no-op, which is useful for + unseed idempotency. Any API/permission failure is raised so callers can avoid + deleting the matching DB rows while storage may still exist. + """ + result = storage.client.supabase.storage.from_(bucket).remove(paths) + error = getattr(result, "error", None) + if error: + raise StorageError(str(error)) + if isinstance(result, dict) and result.get("error"): + raise StorageError(str(result["error"])) + +def _delete_user_subset_files(client: SupabaseServiceRoleClient, storage: StorageAdmin, *, + exam_codes: Optional[List[str]], rep: LoadReport) -> None: + """Delete --user-subset files from cc.users storage, then their files rows. + + User-subset seeding writes rows with source='exam-corpus-seed', bucket='cc.users', + and paths under exam-marker/. Storage must be removed before the files rows: the + files GC trigger also tries to delete storage when rows are deleted, so removing + objects first avoids trigger failures and keeps this operation idempotent. + + exam_codes=None means remove all user-subset seed rows (used by unscoped unseed + even if the eb_* rows were already removed by a prior partial run). + """ + sb = client.supabase + seeded_files: List[Dict[str, Any]] = [] + + def _base_query(): + return sb.table("files").select("id, bucket, path, name, source") \ + .eq("bucket", "cc.users").eq("source", "exam-corpus-seed") \ + .like("path", "exam-marker/%") + + if exam_codes is None: + seeded_files.extend(getattr(_base_query().execute(), "data", None) or []) + elif exam_codes: + for chunk in _chunks([f"{code}.pdf" for code in exam_codes if code], 100): + seeded_files.extend(getattr(_base_query().in_("name", chunk).execute(), "data", None) or []) + + rows_by_id: Dict[str, Dict[str, Any]] = {} + paths_by_bucket: Dict[str, List[str]] = {} + seen_paths: set = set() + for row in seeded_files: + row_id = row.get("id") + bucket = row.get("bucket") + path = row.get("path") + if row_id: + rows_by_id[str(row_id)] = row + if bucket == "cc.users" and isinstance(path, str) and path.startswith("exam-marker/"): + key = (bucket, path) + if key not in seen_paths: + seen_paths.add(key) + paths_by_bucket.setdefault(bucket, []).append(path) + + removable_ids = list(rows_by_id) + if not removable_ids and not paths_by_bucket: + logger.info("[unseed] no user-subset cc.users files to remove") + return + + for bkt, paths in paths_by_bucket.items(): + for chunk in _chunks(paths, 100): + try: + _storage_remove(storage, bkt, chunk) + rep.unseed_objects += len(chunk) + except Exception as exc: + logger.warning(f"[unseed] user-subset storage remove failed ({bkt}, {len(chunk)} objs): {exc}") + rep.errors.append(f"user-subset storage remove {bkt}: {exc}") + return + + for chunk in _chunks(removable_ids, 100): + try: + sb.table("files").delete().in_("id", chunk).execute() + rep.unseed_user_files += len(chunk) + except Exception as exc: + logger.warning(f"[unseed] user-subset files delete failed: {exc}") + rep.errors.append(f"user-subset files delete: {exc}") + def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *, board_filter: Optional[str], spec_filter: Optional[str], drop_specs: bool = True, drop_seed_templates: bool = True, rep: LoadReport) -> None: @@ -597,6 +677,8 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *, specs = getattr(q.execute(), "data", None) or [] spec_codes = [s["spec_code"] for s in specs] if not spec_codes: + if not board_filter and not spec_filter: + _delete_user_subset_files(client, storage, exam_codes=None, rep=rep) logger.info("[unseed] no matching specifications; nothing to do") return @@ -605,7 +687,14 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *, res = sb.table("eb_exams").select("id, exam_code, storage_loc").in_("spec_code", chunk).execute() exams.extend(getattr(res, "data", None) or []) - # 1) Storage objects (Storage API; batch-remove per bucket). Specs may carry a spec PDF too. + # 1) User-subset storage/rows. Storage is removed before files rows so trg_files_gc has + # nothing left to collect when rows are deleted. + user_subset_exam_codes = None if not board_filter and not spec_filter else [ + e.get("exam_code") for e in exams if e.get("exam_code") + ] + _delete_user_subset_files(client, storage, exam_codes=user_subset_exam_codes, rep=rep) + + # 2) Storage objects (Storage API; batch-remove per bucket). Specs may carry a spec PDF too. by_bucket: Dict[str, List[str]] = {} for row in exams + specs: loc = row.get("storage_loc") @@ -621,7 +710,7 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *, except Exception as exc: logger.warning(f"[unseed] storage remove failed ({bkt}, {len(chunk)} objs): {exc}") - # 2) First-sweep templates created by the seed (cascades questions/regions/boundaries/layout). + # 3) First-sweep templates created by the seed (cascades questions/regions/boundaries/layout). if drop_seed_templates and exams: exam_codes = [e["exam_code"] for e in exams if e.get("exam_code")] for chunk in _chunks(exam_codes, 100): @@ -632,7 +721,7 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *, except Exception as exc: logger.warning(f"[unseed] template delete failed: {exc}") - # 3) Catalogue rows: eb_exams (by id), then eb_specifications (by spec_code). + # 4) Catalogue rows: eb_exams (by id), then eb_specifications (by spec_code). exam_ids = [e["id"] for e in exams] for chunk in _chunks(exam_ids, 100): try: @@ -648,8 +737,8 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *, except Exception as exc: logger.warning(f"[unseed] eb_specifications delete failed: {exc}") - logger.info(f"unseed done: storage_objects={rep.unseed_objects} templates={rep.unseed_templates} " - f"exams={rep.unseed_exams} specs={rep.unseed_specs}") + logger.info(f"unseed done: storage_objects={rep.unseed_objects} user_files={rep.unseed_user_files} " + f"templates={rep.unseed_templates} exams={rep.unseed_exams} specs={rep.unseed_specs}") # ─────────────────────────────── orchestration ─────────────────────────────── diff --git a/tests/test_seed_exam_corpus_unseed.py b/tests/test_seed_exam_corpus_unseed.py new file mode 100644 index 0000000..fc98357 --- /dev/null +++ b/tests/test_seed_exam_corpus_unseed.py @@ -0,0 +1,171 @@ +from run.initialization.seed_exam_corpus import LoadReport, _delete_user_subset_files + + +class _Result: + def __init__(self, data=None): + self.data = data or [] + + +class _FilesQuery: + def __init__(self, db, op="select"): + self.db = db + self.op = op + self.filters = [] + self.in_filters = [] + + def select(self, *_args, **_kwargs): + return self + + def delete(self, *_args, **_kwargs): + self.op = "delete" + return self + + def eq(self, key, value): + self.filters.append(("eq", key, value)) + return self + + def like(self, key, pattern): + self.filters.append(("like", key, pattern)) + return self + + def in_(self, key, values): + self.in_filters.append((key, set(values))) + return self + + def _matches(self, row): + for kind, key, value in self.filters: + actual = row.get(key) + if kind == "eq" and actual != value: + return False + if kind == "like": + assert value.endswith("%") + if not isinstance(actual, str) or not actual.startswith(value[:-1]): + return False + for key, values in self.in_filters: + if row.get(key) not in values: + return False + return True + + def execute(self): + matched = [row for row in self.db.rows if self._matches(row)] + if self.op == "delete": + self.db.ops.append(("delete", [row["id"] for row in matched])) + self.db.rows = [row for row in self.db.rows if not self._matches(row)] + return _Result(matched) + return _Result(matched) + + +class _FakeDb: + def __init__(self, rows): + self.rows = list(rows) + self.ops = [] + + def table(self, name): + assert name == "files" + return _FilesQuery(self) + + +class _FakeStorageBucket: + def __init__(self, storage, bucket): + self.storage = storage + self.bucket = bucket + + def remove(self, paths): + self.storage.ops.append(("remove", self.bucket, list(paths))) + if self.storage.fail: + raise RuntimeError("storage unavailable") + if self.storage.result_error: + return {"error": self.storage.result_error} + return [] + + +class _FakeStorageRoot: + def __init__(self, storage): + self.storage = storage + + def from_(self, bucket): + return _FakeStorageBucket(self.storage, bucket) + + +class _FakeStorage: + def __init__(self, fail=False, result_error=None): + self.fail = fail + self.result_error = result_error + self.ops = [] + self.client = type("Client", (), {"supabase": type("SB", (), {"storage": _FakeStorageRoot(self)})()})() + + +class _FakeClient: + def __init__(self, db): + self.supabase = db + + +def test_delete_user_subset_storage_before_files_rows_for_scoped_exams(): + db = _FakeDb([ + {"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, + {"id": "f2", "bucket": "cc.users", "path": "exam-marker/i/c/f2/B.pdf", "name": "B.pdf", "source": "exam-corpus-seed"}, + {"id": "f3", "bucket": "cc.users", "path": "exam-marker/i/c/f3/A.pdf", "name": "A.pdf", "source": "manual"}, + {"id": "f4", "bucket": "cc.users", "path": "other/f4/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, + ]) + storage = _FakeStorage() + rep = LoadReport() + + _delete_user_subset_files(_FakeClient(db), storage, exam_codes=["A"], rep=rep) + + assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf"])] + assert db.ops == [("delete", ["f1"])] + assert [row["id"] for row in db.rows] == ["f2", "f3", "f4"] + assert rep.unseed_objects == 1 + assert rep.unseed_user_files == 1 + assert rep.errors == [] + + +def test_delete_user_subset_keeps_files_rows_when_storage_remove_fails(): + db = _FakeDb([ + {"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, + ]) + storage = _FakeStorage(fail=True) + rep = LoadReport() + + _delete_user_subset_files(_FakeClient(db), storage, exam_codes=["A"], rep=rep) + + assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf"])] + assert db.ops == [] + assert [row["id"] for row in db.rows] == ["f1"] + assert rep.unseed_objects == 0 + assert rep.unseed_user_files == 0 + assert rep.errors + + +def test_delete_user_subset_keeps_files_rows_when_storage_remove_returns_error(): + db = _FakeDb([ + {"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, + ]) + storage = _FakeStorage(result_error="permission denied") + rep = LoadReport() + + _delete_user_subset_files(_FakeClient(db), storage, exam_codes=["A"], rep=rep) + + assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf"])] + assert db.ops == [] + assert [row["id"] for row in db.rows] == ["f1"] + assert rep.unseed_objects == 0 + assert rep.unseed_user_files == 0 + assert rep.errors + + +def test_delete_user_subset_unscoped_cleans_all_seeded_exam_marker_rows(): + db = _FakeDb([ + {"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, + {"id": "f2", "bucket": "cc.users", "path": "exam-marker/i/c/f2/B.pdf", "name": "B.pdf", "source": "exam-corpus-seed"}, + ]) + storage = _FakeStorage() + rep = LoadReport() + + _delete_user_subset_files(_FakeClient(db), storage, exam_codes=None, rep=rep) + + assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf", "exam-marker/i/c/f2/B.pdf"])] + assert db.ops == [("delete", ["f1", "f2"])] + assert db.rows == [] + assert rep.unseed_objects == 2 + assert rep.unseed_user_files == 2