from run.initialization.seed_exam_corpus import LoadReport, _delete_user_subset_files class _Result: def __init__(self, data=None): self.data = data or [] class _FilesQuery: def __init__(self, db, op="select"): self.db = db self.op = op self.filters = [] self.in_filters = [] def select(self, *_args, **_kwargs): return self def delete(self, *_args, **_kwargs): self.op = "delete" return self def eq(self, key, value): self.filters.append(("eq", key, value)) return self def like(self, key, pattern): self.filters.append(("like", key, pattern)) return self def in_(self, key, values): self.in_filters.append((key, set(values))) return self def _matches(self, row): for kind, key, value in self.filters: actual = row.get(key) if kind == "eq" and actual != value: return False if kind == "like": assert value.endswith("%") if not isinstance(actual, str) or not actual.startswith(value[:-1]): return False for key, values in self.in_filters: if row.get(key) not in values: return False return True def execute(self): matched = [row for row in self.db.rows if self._matches(row)] if self.op == "delete": self.db.ops.append(("delete", [row["id"] for row in matched])) self.db.rows = [row for row in self.db.rows if not self._matches(row)] return _Result(matched) return _Result(matched) class _FakeDb: def __init__(self, rows): self.rows = list(rows) self.ops = [] def table(self, name): assert name == "files" return _FilesQuery(self) class _FakeStorageBucket: def __init__(self, storage, bucket): self.storage = storage self.bucket = bucket def remove(self, paths): self.storage.ops.append(("remove", self.bucket, list(paths))) if self.storage.fail: raise RuntimeError("storage unavailable") if self.storage.result_error: return {"error": self.storage.result_error} return [] class _FakeStorageRoot: def __init__(self, storage): self.storage = storage def from_(self, bucket): return _FakeStorageBucket(self.storage, bucket) class _FakeStorage: def __init__(self, fail=False, result_error=None): self.fail = fail self.result_error = result_error self.ops = [] self.client = type("Client", (), {"supabase": type("SB", (), {"storage": _FakeStorageRoot(self)})()})() class _FakeClient: def __init__(self, db): self.supabase = db def test_delete_user_subset_storage_before_files_rows_for_scoped_exams(): db = _FakeDb([ {"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, {"id": "f2", "bucket": "cc.users", "path": "exam-marker/i/c/f2/B.pdf", "name": "B.pdf", "source": "exam-corpus-seed"}, {"id": "f3", "bucket": "cc.users", "path": "exam-marker/i/c/f3/A.pdf", "name": "A.pdf", "source": "manual"}, {"id": "f4", "bucket": "cc.users", "path": "other/f4/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, ]) storage = _FakeStorage() rep = LoadReport() _delete_user_subset_files(_FakeClient(db), storage, exam_codes=["A"], rep=rep) assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf"])] assert db.ops == [("delete", ["f1"])] assert [row["id"] for row in db.rows] == ["f2", "f3", "f4"] assert rep.unseed_objects == 1 assert rep.unseed_user_files == 1 assert rep.errors == [] def test_delete_user_subset_keeps_files_rows_when_storage_remove_fails(): db = _FakeDb([ {"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, ]) storage = _FakeStorage(fail=True) rep = LoadReport() _delete_user_subset_files(_FakeClient(db), storage, exam_codes=["A"], rep=rep) assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf"])] assert db.ops == [] assert [row["id"] for row in db.rows] == ["f1"] assert rep.unseed_objects == 0 assert rep.unseed_user_files == 0 assert rep.errors def test_delete_user_subset_keeps_files_rows_when_storage_remove_returns_error(): db = _FakeDb([ {"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, ]) storage = _FakeStorage(result_error="permission denied") rep = LoadReport() _delete_user_subset_files(_FakeClient(db), storage, exam_codes=["A"], rep=rep) assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf"])] assert db.ops == [] assert [row["id"] for row in db.rows] == ["f1"] assert rep.unseed_objects == 0 assert rep.unseed_user_files == 0 assert rep.errors def test_delete_user_subset_unscoped_cleans_all_seeded_exam_marker_rows(): db = _FakeDb([ {"id": "f1", "bucket": "cc.users", "path": "exam-marker/i/c/f1/A.pdf", "name": "A.pdf", "source": "exam-corpus-seed"}, {"id": "f2", "bucket": "cc.users", "path": "exam-marker/i/c/f2/B.pdf", "name": "B.pdf", "source": "exam-corpus-seed"}, ]) storage = _FakeStorage() rep = LoadReport() _delete_user_subset_files(_FakeClient(db), storage, exam_codes=None, rep=rep) assert storage.ops == [("remove", "cc.users", ["exam-marker/i/c/f1/A.pdf", "exam-marker/i/c/f2/B.pdf"])] assert db.ops == [("delete", ["f1", "f2"])] assert db.rows == [] assert rep.unseed_objects == 2 assert rep.unseed_user_files == 2