import os from typing import Dict, List, Optional, Any, TypedDict from .client import SupabaseServiceRoleClient, SupabaseAnonClient from modules.logger_tool import initialise_logger class CreateBucketOptions(TypedDict, total=False): """Options for bucket creation, matching Supabase API requirements""" public: bool file_size_limit: int allowed_mime_types: List[str] class StorageError(Exception): """Custom exception for storage-related errors""" pass class StorageManager: """Base storage manager class with common functionality""" def __init__(self, client: SupabaseServiceRoleClient | SupabaseAnonClient): self.client = client self.logger = initialise_logger(__name__) def check_bucket_exists(self, bucket_id: str) -> bool: """Check if a storage bucket exists""" try: self.logger.info(f"Checking if bucket {{bucket_id}} exists") buckets = self.client.supabase.storage.list_buckets() return any(bucket.name == bucket_id for bucket in buckets) except Exception as e: self.logger.error(f"Error checking bucket {{bucket_id}}: {{str(e)}}") return False def list_bucket_contents(self, bucket_id: str, path: str = "") -> Dict: """List contents of a bucket at specified path""" try: self.logger.info(f"Listing contents of bucket {{bucket_id}} at path {{path}}") contents = self.client.supabase.storage.from_(bucket_id).list(path) return {{ "folders": [item for item in contents if item.get("id", "").endswith("/")], "files": [item for item in contents if not item.get("id", "").endswith("/")] }} except Exception as e: self.logger.error(f"Error listing bucket contents: {{str(e)}}") raise StorageError(str(e)) def upload_file(self, bucket_id: str, file_path: str, file_data: bytes, content_type: str, upsert: bool = True) -> Any: """Upload a file to a storage bucket""" try: self.logger.info(f"Uploading file to {{bucket_id}} at path {{file_path}}") return self.client.supabase.storage.from_(bucket_id).upload( path=file_path, file=file_data, file_options={{ "content-type": content_type, "x-upsert": "true" if upsert else "false" }} ) except Exception as e: self.logger.error(f"Error uploading file: {{str(e)}}") raise StorageError(str(e)) def download_file(self, bucket_id: str, file_path: str) -> bytes: """Download a file from a storage bucket""" try: self.logger.info(f"Downloading file from {{bucket_id}} at path {{file_path}}") return self.client.supabase.storage.from_(bucket_id).download(file_path) except Exception as e: self.logger.error(f"Error downloading file: {{str(e)}}") raise StorageError(str(e)) def delete_file(self, bucket_id: str, file_path: str) -> None: """Delete a file from a storage bucket""" try: self.logger.info(f"Deleting file from {{bucket_id}} at path {{file_path}}") self.client.supabase.storage.from_(bucket_id).remove([file_path]) except Exception as e: self.logger.error(f"Error deleting file: {{str(e)}}") raise StorageError(str(e)) def get_public_url(self, bucket_id: str, file_path: str) -> str: """Get public URL for a file""" try: self.logger.info(f"Getting public URL for file in {{bucket_id}} at path {{file_path}}") return self.client.supabase.storage.from_(bucket_id).get_public_url(file_path) except Exception as e: self.logger.error(f"Error getting public URL: {{str(e)}}") raise StorageError(str(e)) def create_signed_url(self, bucket_id: str, file_path: str, expires_in: int = 3600) -> Any: """Create a signed URL for temporary file access""" try: self.logger.info(f"Creating signed URL for file in {{bucket_id}} at path {{file_path}}") return self.client.supabase.storage.from_(bucket_id).create_signed_url(file_path, expires_in) except Exception as e: self.logger.error(f"Error creating signed URL: {{str(e)}}") raise StorageError(str(e)) class StorageAdmin(StorageManager): """Storage admin class for managing storage buckets with service role access.""" def __init__(self, admin_user_id: Optional[str] = None): """Initialize StorageAdmin with service role client.""" super().__init__(SupabaseServiceRoleClient()) self.admin_user_id = admin_user_id def create_bucket( self, id: str, name: Optional[str] = None, public: bool = False, file_size_limit: Optional[int] = None, allowed_mime_types: Optional[List[str]] = None, owner: Optional[str] = None, owner_id: Optional[str] = None ) -> Dict[str, Any]: """Create a new storage bucket with supported parameters.""" try: self.logger.info(f"Creating bucket {{id}} with name {{name}}") options: Optional[CreateBucketOptions] = {{}} if public: options["public"] = public if file_size_limit is not None: options["file_size_limit"] = file_size_limit if allowed_mime_types is not None: options["allowed_mime_types"] = allowed_mime_types bucket = self.client.supabase.storage.create_bucket( str(id), options=options if options else None ) return bucket except Exception as e: self.logger.error(f"Error creating bucket {{id}}: {{str(e)}}") raise StorageError(str(e)) def initialize_core_buckets(self, admin_user_id: Optional[str] = None) -> List[Dict[str, Any]]: """Initialize core storage buckets for the application.""" try: owner_id = admin_user_id or self.admin_user_id if not owner_id: raise ValueError("Admin user ID is required for bucket initialization") core_buckets = [ {{ "id": "cc.users", "name": "CC Users", "public": False, "owner": owner_id, "owner_id": "superadmin", "file_size_limit": 50 * 1024 * 1024, "allowed_mime_types": [ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] }}, {{ "id": "cc.institutes", "name": "CC Institutes", "public": False, "owner": owner_id, "owner_id": "superadmin", "file_size_limit": 50 * 1024 * 1024, "allowed_mime_types": [ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] }} ] results = [] for bucket in core_buckets: try: bucket_name = bucket.pop("name") result = self.create_bucket(name=bucket_name, **bucket) results.append({{ "bucket": bucket["id"], "status": "success", "result": result }}) except Exception as e: self.logger.error(f"Error creating bucket {{bucket['id']}}: {{str(e)}}") results.append({{ "bucket": bucket["id"], "status": "error", "error": str(e) }}) return results except Exception as e: self.logger.error(f"Error initializing core buckets: {{str(e)}}") raise StorageError(str(e)) def create_user_bucket(self, user_id: str, username: str) -> Dict[str, Any]: """Create a storage bucket for a specific user.""" try: bucket_id = f"cc.users.admin.{{username}}" bucket_name = f"User Files - {{username}}" return self.create_bucket( id=bucket_id, name=bucket_name, public=False, owner=user_id, owner_id=username, file_size_limit=50 * 1024 * 1024, allowed_mime_types=[ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] ) except Exception as e: self.logger.error(f"Error creating user bucket for {{username}}: {{str(e)}}") raise StorageError(str(e)) def create_school_buckets(self, school_id: str, school_name: str, admin_user_id: Optional[str] = None) -> Dict[str, Any]: """Create storage buckets for a school.""" try: owner_id = admin_user_id or self.admin_user_id if not owner_id: raise ValueError("Admin user ID is required for school bucket creation") school_buckets = [ {{ "id": f"cc.institutes.{{school_id}}.public", "name": f"{{school_name}} - Public Files", "public": True, "owner": owner_id, "owner_id": school_id, "file_size_limit": 50 * 1024 * 1024, "allowed_mime_types": [ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] }}, {{ "id": f"cc.institutes.{{school_id}}.private", "name": f"{{school_name}} - Private Files", "public": False, "owner": owner_id, "owner_id": school_id, "file_size_limit": 50 * 1024 * 1024, "allowed_mime_types": [ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] }} ] results = {{}} for bucket in school_buckets: try: bucket_name = bucket.pop("name") result = self.create_bucket(name=bucket_name, **bucket) results[bucket["id"]] = {{ "status": "success", "result": result }} except Exception as e: self.logger.error(f"Error creating school bucket {{bucket['id']}}: {{str(e)}}") results[bucket["id"]] = {{ "status": "error", "error": str(e) }} return results except Exception as e: self.logger.error(f"Error creating school buckets: {{str(e)}}") raise StorageError(str(e)) class StorageUser(StorageManager): """Storage user class for managing storage with per-user RLS enforcement. Requires a user access token to enforce Row Level Security policies. Without a token, requests use the anon key which does NOT enforce per-user RLS. """ def __init__(self, user_id: Optional[str] = None, access_token: Optional[str] = None): """Initialize StorageUser with user role client. Args: user_id: The user's ID (for logging/context) access_token: User's JWT access token for per-user RLS enforcement """ self.user_id = user_id # Pass access_token to enable per-user RLS via auth.uid() in JWT client = SupabaseAnonClient.for_user(access_token) if access_token else SupabaseAnonClient() super().__init__(client)