diff --git a/routers/database/files/files.py b/routers/database/files/files.py index 07edf9b..6d81b46 100644 --- a/routers/database/files/files.py +++ b/routers/database/files/files.py @@ -36,6 +36,24 @@ DOCLING_NOOCR_TIMEOUT = int(os.getenv('DOCLING_NOOCR_TIMEOUT', '3600')) # 1 hou logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True) +def _user_id_from_payload(payload: Dict[str, Any]) -> str: + user_id = payload.get('sub') or payload.get('user_id') + if not user_id: + raise HTTPException(status_code=401, detail="Invalid token payload") + return user_id + +def _cabinet_visible_to_user(client: SupabaseServiceRoleClient, cabinet_id: str, user_id: str) -> bool: + """Require cabinet ownership before service-role reads file metadata.""" + owned = ( + client.supabase.table('file_cabinets') + .select('id') + .eq('id', cabinet_id) + .eq('user_id', user_id) + .limit(1) + .execute() + ) + return bool(owned.data) + def _safe_filename(name: str) -> str: base = os.path.basename(name or 'file') return re.sub(r"[^A-Za-z0-9._-]+", "_", base) @@ -117,7 +135,10 @@ async def upload_file( @router.get("/files") def list_files(cabinet_id: str, payload: Dict[str, Any] = Depends(auth)): + user_id = _user_id_from_payload(payload) client = SupabaseServiceRoleClient() + if not _cabinet_visible_to_user(client, cabinet_id, user_id): + return [] res = client.supabase.table('files').select('*').eq('cabinet_id', cabinet_id).execute() return res.data diff --git a/routers/database/files/files_simplified.py b/routers/database/files/files_simplified.py index 32b40c9..2dd5397 100644 --- a/routers/database/files/files_simplified.py +++ b/routers/database/files/files_simplified.py @@ -26,6 +26,24 @@ auth = SupabaseBearer() logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True) +def _user_id_from_payload(payload: Dict[str, Any]) -> str: + user_id = payload.get('sub') or payload.get('user_id') + if not user_id: + raise HTTPException(status_code=401, detail="Invalid token payload") + return user_id + +def _cabinet_visible_to_user(client: SupabaseServiceRoleClient, cabinet_id: str, user_id: str) -> bool: + """Require cabinet ownership before service-role reads file metadata.""" + owned = ( + client.supabase.table('file_cabinets') + .select('id') + .eq('id', cabinet_id) + .eq('user_id', user_id) + .limit(1) + .execute() + ) + return bool(owned.data) + def _choose_bucket(scope: str, user_id: str, school_id: Optional[str]) -> str: """Choose appropriate bucket based on scope - matches old system logic.""" scope = (scope or 'teacher').lower() @@ -134,7 +152,10 @@ async def upload_file( @router.get("/files") def list_files(cabinet_id: str, payload: Dict[str, Any] = Depends(auth)): """List files in a cabinet.""" + user_id = _user_id_from_payload(payload) client = SupabaseServiceRoleClient() + if not _cabinet_visible_to_user(client, cabinet_id, user_id): + return [] res = client.supabase.table('files').select('*').eq('cabinet_id', cabinet_id).execute() return res.data diff --git a/tests/test_files_idor.py b/tests/test_files_idor.py new file mode 100644 index 0000000..2ec0b6e --- /dev/null +++ b/tests/test_files_idor.py @@ -0,0 +1,103 @@ +from types import SimpleNamespace + +import pytest + +import routers.database.files.files as files_router +import routers.database.files.files_simplified as files_simplified_router + + +ROUTERS = [files_router, files_simplified_router] + +USER_A = "00000000-0000-0000-0000-000000000001" +USER_B = "00000000-0000-0000-0000-000000000002" +CAB_A = "10000000-0000-0000-0000-000000000001" +CAB_B = "10000000-0000-0000-0000-000000000002" + + +class FakeQuery: + def __init__(self, rows): + self.rows = list(rows) + + def select(self, *_args, **_kwargs): + return self + + def eq(self, key, value): + self.rows = [row for row in self.rows if row.get(key) == value] + return self + + def limit(self, _n): + return self + + def execute(self): + return SimpleNamespace(data=self.rows) + + +class FakeSupabase: + def __init__(self, store): + self.store = store + + def table(self, name): + return FakeQuery(self.store.get(name, [])) + + +class FakeServiceRoleClient: + def __init__(self, store): + self.supabase = FakeSupabase(store) + + +@pytest.mark.parametrize("router_module", ROUTERS) +def test_list_files_hides_unowned_unshared_cabinet(monkeypatch, router_module): + store = { + "file_cabinets": [ + {"id": CAB_A, "user_id": USER_A}, + {"id": CAB_B, "user_id": USER_B}, + ], + "cabinet_memberships": [], + "files": [ + {"id": "file-a", "cabinet_id": CAB_A, "uploaded_by": USER_A}, + {"id": "file-b", "cabinet_id": CAB_B, "uploaded_by": USER_B}, + ], + } + monkeypatch.setattr( + router_module, + "SupabaseServiceRoleClient", + lambda: FakeServiceRoleClient(store), + ) + + assert router_module.list_files(CAB_B, {"sub": USER_A}) == [] + + +@pytest.mark.parametrize("router_module", ROUTERS) +def test_list_files_allows_own_cabinet(monkeypatch, router_module): + store = { + "file_cabinets": [{"id": CAB_A, "user_id": USER_A}], + "cabinet_memberships": [], + "files": [{"id": "file-a", "cabinet_id": CAB_A, "uploaded_by": USER_A}], + } + monkeypatch.setattr( + router_module, + "SupabaseServiceRoleClient", + lambda: FakeServiceRoleClient(store), + ) + + assert router_module.list_files(CAB_A, {"sub": USER_A}) == [ + {"id": "file-a", "cabinet_id": CAB_A, "uploaded_by": USER_A} + ] + + +@pytest.mark.parametrize("router_module", ROUTERS) +def test_list_files_denies_non_owner_even_with_cabinet_membership(monkeypatch, router_module): + store = { + "file_cabinets": [{"id": CAB_B, "user_id": USER_B}], + "cabinet_memberships": [ + {"cabinet_id": CAB_B, "profile_id": USER_A, "role": "viewer"} + ], + "files": [{"id": "file-b", "cabinet_id": CAB_B, "uploaded_by": USER_B}], + } + monkeypatch.setattr( + router_module, + "SupabaseServiceRoleClient", + lambda: FakeServiceRoleClient(store), + ) + + assert router_module.list_files(CAB_B, {"sub": USER_A}) == []