fix(seed): unseed user-subset storage objects
Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled

(cherry picked from commit 9328ec2e062d039c0bcfabb086ce0693fe1ebe50)
This commit is contained in:
kcar 2026-06-08 01:10:55 +01:00 committed by CC Worker
parent 5da108df13
commit 7819e6e346
2 changed files with 265 additions and 5 deletions

View File

@ -105,6 +105,7 @@ class LoadReport:
downloaded: int = 0
download_cached: int = 0
unseed_objects: int = 0
unseed_user_files: int = 0
unseed_exams: int = 0
unseed_specs: int = 0
unseed_templates: int = 0
@ -117,6 +118,7 @@ class LoadReport:
"downloaded": self.downloaded,
"download_cached": self.download_cached,
"unseed_objects": self.unseed_objects,
"unseed_user_files": self.unseed_user_files,
"unseed_exams": self.unseed_exams,
"unseed_specs": self.unseed_specs,
"unseed_templates": self.unseed_templates,
@ -579,6 +581,84 @@ def _chunks(seq: List[Any], n: int = 100):
for i in range(0, len(seq), n):
yield seq[i:i + n]
def _storage_remove(storage: StorageAdmin, bucket: str, paths: List[str]) -> None:
"""Remove object paths from a bucket through the Supabase Storage API.
The python client treats missing objects as a successful no-op, which is useful for
unseed idempotency. Any API/permission failure is raised so callers can avoid
deleting the matching DB rows while storage may still exist.
"""
result = storage.client.supabase.storage.from_(bucket).remove(paths)
error = getattr(result, "error", None)
if error:
raise StorageError(str(error))
if isinstance(result, dict) and result.get("error"):
raise StorageError(str(result["error"]))
def _delete_user_subset_files(client: SupabaseServiceRoleClient, storage: StorageAdmin, *,
exam_codes: Optional[List[str]], rep: LoadReport) -> None:
"""Delete --user-subset files from cc.users storage, then their files rows.
User-subset seeding writes rows with source='exam-corpus-seed', bucket='cc.users',
and paths under exam-marker/. Storage must be removed before the files rows: the
files GC trigger also tries to delete storage when rows are deleted, so removing
objects first avoids trigger failures and keeps this operation idempotent.
exam_codes=None means remove all user-subset seed rows (used by unscoped unseed
even if the eb_* rows were already removed by a prior partial run).
"""
sb = client.supabase
seeded_files: List[Dict[str, Any]] = []
def _base_query():
return sb.table("files").select("id, bucket, path, name, source") \
.eq("bucket", "cc.users").eq("source", "exam-corpus-seed") \
.like("path", "exam-marker/%")
if exam_codes is None:
seeded_files.extend(getattr(_base_query().execute(), "data", None) or [])
elif exam_codes:
for chunk in _chunks([f"{code}.pdf" for code in exam_codes if code], 100):
seeded_files.extend(getattr(_base_query().in_("name", chunk).execute(), "data", None) or [])
rows_by_id: Dict[str, Dict[str, Any]] = {}
paths_by_bucket: Dict[str, List[str]] = {}
seen_paths: set = set()
for row in seeded_files:
row_id = row.get("id")
bucket = row.get("bucket")
path = row.get("path")
if row_id:
rows_by_id[str(row_id)] = row
if bucket == "cc.users" and isinstance(path, str) and path.startswith("exam-marker/"):
key = (bucket, path)
if key not in seen_paths:
seen_paths.add(key)
paths_by_bucket.setdefault(bucket, []).append(path)
removable_ids = list(rows_by_id)
if not removable_ids and not paths_by_bucket:
logger.info("[unseed] no user-subset cc.users files to remove")
return
for bkt, paths in paths_by_bucket.items():
for chunk in _chunks(paths, 100):
try:
_storage_remove(storage, bkt, chunk)
rep.unseed_objects += len(chunk)
except Exception as exc:
logger.warning(f"[unseed] user-subset storage remove failed ({bkt}, {len(chunk)} objs): {exc}")
rep.errors.append(f"user-subset storage remove {bkt}: {exc}")
return
for chunk in _chunks(removable_ids, 100):
try:
sb.table("files").delete().in_("id", chunk).execute()
rep.unseed_user_files += len(chunk)
except Exception as exc:
logger.warning(f"[unseed] user-subset files delete failed: {exc}")
rep.errors.append(f"user-subset files delete: {exc}")
def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *,
board_filter: Optional[str], spec_filter: Optional[str],
drop_specs: bool = True, drop_seed_templates: bool = True, rep: LoadReport) -> None:
@ -597,6 +677,8 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *,
specs = getattr(q.execute(), "data", None) or []
spec_codes = [s["spec_code"] for s in specs]
if not spec_codes:
if not board_filter and not spec_filter:
_delete_user_subset_files(client, storage, exam_codes=None, rep=rep)
logger.info("[unseed] no matching specifications; nothing to do")
return
@ -605,7 +687,14 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *,
res = sb.table("eb_exams").select("id, exam_code, storage_loc").in_("spec_code", chunk).execute()
exams.extend(getattr(res, "data", None) or [])
# 1) Storage objects (Storage API; batch-remove per bucket). Specs may carry a spec PDF too.
# 1) User-subset storage/rows. Storage is removed before files rows so trg_files_gc has
# nothing left to collect when rows are deleted.
user_subset_exam_codes = None if not board_filter and not spec_filter else [
e.get("exam_code") for e in exams if e.get("exam_code")
]
_delete_user_subset_files(client, storage, exam_codes=user_subset_exam_codes, rep=rep)
# 2) Storage objects (Storage API; batch-remove per bucket). Specs may carry a spec PDF too.
by_bucket: Dict[str, List[str]] = {}
for row in exams + specs:
loc = row.get("storage_loc")
@ -621,7 +710,7 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *,
except Exception as exc:
logger.warning(f"[unseed] storage remove failed ({bkt}, {len(chunk)} objs): {exc}")
# 2) First-sweep templates created by the seed (cascades questions/regions/boundaries/layout).
# 3) First-sweep templates created by the seed (cascades questions/regions/boundaries/layout).
if drop_seed_templates and exams:
exam_codes = [e["exam_code"] for e in exams if e.get("exam_code")]
for chunk in _chunks(exam_codes, 100):
@ -632,7 +721,7 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *,
except Exception as exc:
logger.warning(f"[unseed] template delete failed: {exc}")
# 3) Catalogue rows: eb_exams (by id), then eb_specifications (by spec_code).
# 4) Catalogue rows: eb_exams (by id), then eb_specifications (by spec_code).
exam_ids = [e["id"] for e in exams]
for chunk in _chunks(exam_ids, 100):
try:
@ -648,8 +737,8 @@ def unseed(client: SupabaseServiceRoleClient, storage: StorageAdmin, *,
except Exception as exc:
logger.warning(f"[unseed] eb_specifications delete failed: {exc}")
logger.info(f"unseed done: storage_objects={rep.unseed_objects} templates={rep.unseed_templates} "
f"exams={rep.unseed_exams} specs={rep.unseed_specs}")
logger.info(f"unseed done: storage_objects={rep.unseed_objects} user_files={rep.unseed_user_files} "
f"templates={rep.unseed_templates} exams={rep.unseed_exams} specs={rep.unseed_specs}")
# ─────────────────────────────── orchestration ───────────────────────────────

View File

@ -0,0 +1,171 @@
from run.initialization.seed_exam_corpus import LoadReport, _delete_user_subset_files
class _Result:
def __init__(self, data=None):
self.data = data or []
class _FilesQuery:
def __init__(self, db, op="select"):
self.db = db
self.op = op
self.filters = []
self.in_filters = []
def select(self, *_args, **_kwargs):
return self
def delete(self, *_args, **_kwargs):
self.op = "delete"
return self
def eq(self, key, value):
self.filters.append(("eq", key, value))
return self
def like(self, key, pattern):
self.filters.append(("like", key, pattern))
return self
def in_(self, key, values):
self.in_filters.append((key, set(values)))
return self
def _matches(self, row):
for kind, key, value in self.filters:
actual = row.get(key)
if kind == "eq" and actual != value:
return False
if kind == "like":
assert value.endswith("%")
if not isinstance(actual, str) or not actual.startswith(value[:-1]):
return False
for key, values in self.in_filters:
if row.get(key) not in values:
return False
return True
def execute(self):
matched = [row for row in self.db.rows if self._matches(row)]
if self.op == "delete":
self.db.ops.append(("delete", [row["id"] for row in matched]))
self.db.rows = [row for row in self.db.rows if not self._matches(row)]
return _Result(matched)
return _Result(matched)
class _FakeDb:
def __init__(self, rows):
self.rows = list(rows)
self.ops = []
def table(self, name):
assert name == "files"
return _FilesQuery(self)
class _FakeStorageBucket:
def __init__(self, storage, bucket):
self.storage = storage
self.bucket = bucket
def remove(self, paths):
self.storage.ops.append(("remove", self.bucket, list(paths)))
if self.storage.fail:
raise RuntimeError("storage unavailable")
if self.storage.result_error:
return {"error": self.storage.result_error}
return []
class _FakeStorageRoot:
def __init__(self, storage):
self.storage = storage
def from_(self, bucket):
return _FakeStorageBucket(self.storage, bucket)
class _FakeStorage:
def __init__(self, fail=False, result_error=None):
self.fail = fail
self.result_error = result_error
self.ops = []
self.client = type("Client", (), {"supabase": type("SB", (), {"storage": _FakeStorageRoot(self)})()})()
class _FakeClient:
def __init__(self, db):
self.supabase = db
def test_delete_user_subset_storage_before_files_rows_for_scoped_exams():
db = _FakeDb([
{"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"},
{"id": "f2", "bucket": "cc.users", "path": "exam-marker/i/c/f2/B.pdf", "name": "B.pdf", "source": "exam-corpus-seed"},
{"id": "f3", "bucket": "cc.users", "path": "exam-marker/i/c/f3/A.pdf", "name": "A.pdf", "source": "manual"},
{"id": "f4", "bucket": "cc.users", "path": "other/f4/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"},
])
storage = _FakeStorage()
rep = LoadReport()
_delete_user_subset_files(_FakeClient(db), storage, exam_codes=["A"], rep=rep)
assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf"])]
assert db.ops == [("delete", ["f1"])]
assert [row["id"] for row in db.rows] == ["f2", "f3", "f4"]
assert rep.unseed_objects == 1
assert rep.unseed_user_files == 1
assert rep.errors == []
def test_delete_user_subset_keeps_files_rows_when_storage_remove_fails():
db = _FakeDb([
{"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"},
])
storage = _FakeStorage(fail=True)
rep = LoadReport()
_delete_user_subset_files(_FakeClient(db), storage, exam_codes=["A"], rep=rep)
assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf"])]
assert db.ops == []
assert [row["id"] for row in db.rows] == ["f1"]
assert rep.unseed_objects == 0
assert rep.unseed_user_files == 0
assert rep.errors
def test_delete_user_subset_keeps_files_rows_when_storage_remove_returns_error():
db = _FakeDb([
{"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"},
])
storage = _FakeStorage(result_error="permission denied")
rep = LoadReport()
_delete_user_subset_files(_FakeClient(db), storage, exam_codes=["A"], rep=rep)
assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf"])]
assert db.ops == []
assert [row["id"] for row in db.rows] == ["f1"]
assert rep.unseed_objects == 0
assert rep.unseed_user_files == 0
assert rep.errors
def test_delete_user_subset_unscoped_cleans_all_seeded_exam_marker_rows():
db = _FakeDb([
{"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"},
{"id": "f2", "bucket": "cc.users", "path": "exam-marker/i/c/f2/B.pdf", "name": "B.pdf", "source": "exam-corpus-seed"},
])
storage = _FakeStorage()
rep = LoadReport()
_delete_user_subset_files(_FakeClient(db), storage, exam_codes=None, rep=rep)
assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf", "exam-marker/i/c/f2/B.pdf"])]
assert db.ops == [("delete", ["f1", "f2"])]
assert db.rows == []
assert rep.unseed_objects == 2
assert rep.unseed_user_files == 2