import os from typing import Dict, List, Optional, Any, TypedDict from .client import SupabaseServiceRoleClient, SupabaseAnonClient from modules.logger_tool import initialise_logger class CreateBucketOptions(TypedDict, total=False): """Options for bucket creation, matching Supabase API requirements""" public: bool file_size_limit: int allowed_mime_types: List[str] class StorageError(Exception): """Custom exception for storage-related errors""" pass class StorageManager: """Base storage manager class with common functionality""" def __init__(self, client: SupabaseServiceRoleClient | SupabaseAnonClient): self.client = client self.logger = initialise_logger(__name__) def check_bucket_exists(self, bucket_id: str) -> bool: """Check if a storage bucket exists""" try: self.logger.info(f"Checking if bucket {bucket_id} exists") buckets = self.client.supabase.storage.list_buckets() return any(bucket.name == bucket_id for bucket in buckets) except Exception as e: self.logger.error(f"Error checking bucket {bucket_id}: {str(e)}") return False def list_bucket_contents(self, bucket_id: str, path: str = "") -> Dict: """List contents of a bucket at specified path""" try: self.logger.info(f"Listing contents of bucket {bucket_id} at path {path}") contents = self.client.supabase.storage.from_(bucket_id).list(path) return { "folders": [item for item in contents if item.get("id", "").endswith("/")], "files": [item for item in contents if not item.get("id", "").endswith("/")] } except Exception as e: self.logger.error(f"Error listing bucket contents: {str(e)}") raise StorageError(str(e)) def upload_file(self, bucket_id: str, file_path: str, file_data: bytes, content_type: str, upsert: bool = True) -> Any: """Upload a file to a storage bucket""" try: self.logger.info(f"Uploading file to {bucket_id} at path {file_path}") return self.client.supabase.storage.from_(bucket_id).upload( path=file_path, file=file_data, file_options={ "content-type": content_type, "x-upsert": "true" if upsert else "false" } ) except Exception as e: self.logger.error(f"Error uploading file: {str(e)}") raise StorageError(str(e)) def download_file(self, bucket_id: str, file_path: str) -> bytes: """Download a file from a storage bucket""" try: self.logger.info(f"Downloading file from {bucket_id} at path {file_path}") return self.client.supabase.storage.from_(bucket_id).download(file_path) except Exception as e: self.logger.error(f"Error downloading file: {str(e)}") raise StorageError(str(e)) def delete_file(self, bucket_id: str, file_path: str) -> None: """Delete a file from a storage bucket""" try: self.logger.info(f"Deleting file from {bucket_id} at path {file_path}") self.client.supabase.storage.from_(bucket_id).remove([file_path]) except Exception as e: self.logger.error(f"Error deleting file: {str(e)}") raise StorageError(str(e)) def get_public_url(self, bucket_id: str, file_path: str) -> str: """Get public URL for a file""" try: self.logger.info(f"Getting public URL for file in {bucket_id} at path {file_path}") return self.client.supabase.storage.from_(bucket_id).get_public_url(file_path) except Exception as e: self.logger.error(f"Error getting public URL: {str(e)}") raise StorageError(str(e)) def create_signed_url(self, bucket_id: str, file_path: str, expires_in: int = 3600) -> Any: """Create a signed URL for temporary file access""" try: self.logger.info(f"Creating signed URL for file in {bucket_id} at path {file_path}") return self.client.supabase.storage.from_(bucket_id).create_signed_url(file_path, expires_in) except Exception as e: self.logger.error(f"Error creating signed URL: {str(e)}") raise StorageError(str(e)) class StorageAdmin(StorageManager): """Storage admin class for managing storage buckets with service role access.""" def __init__(self, admin_user_id: Optional[str] = None): """Initialize StorageAdmin with service role client.""" super().__init__(SupabaseServiceRoleClient()) self.admin_user_id = admin_user_id def create_bucket( self, id: str, name: Optional[str] = None, public: bool = False, file_size_limit: Optional[int] = None, allowed_mime_types: Optional[List[str]] = None, owner: Optional[str] = None, # Kept for backwards compatibility but not used owner_id: Optional[str] = None # Kept for backwards compatibility but not used ) -> Dict[str, Any]: """Create a new storage bucket with supported parameters.""" try: self.logger.info(f"Creating bucket {id} with name {name}") # Prepare bucket options with only supported parameters options: Optional[CreateBucketOptions] = {} if public: options["public"] = public if file_size_limit is not None: options["file_size_limit"] = file_size_limit if allowed_mime_types is not None: options["allowed_mime_types"] = allowed_mime_types # Create bucket with supported parameters only bucket = self.client.supabase.storage.create_bucket( str(id), options=options if options else None ) return bucket except Exception as e: self.logger.error(f"Error creating bucket {id}: {str(e)}") raise StorageError(str(e)) def initialize_core_buckets(self, admin_user_id: Optional[str] = None) -> List[Dict[str, Any]]: """Initialize core storage buckets for the application.""" try: owner_id = admin_user_id or self.admin_user_id if not owner_id: raise ValueError("Admin user ID is required for bucket initialization") core_buckets = [ { "id": "cc.users", "name": "CC Users", "public": False, "owner": owner_id, "owner_id": "superadmin", "file_size_limit": 50 * 1024 * 1024, # 50MB "allowed_mime_types": [ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] }, { "id": "cc.institutes", "name": "CC Institutes", "public": False, "owner": owner_id, "owner_id": "superadmin", "file_size_limit": 50 * 1024 * 1024, # 50MB "allowed_mime_types": [ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] } ] results = [] for bucket in core_buckets: try: bucket_name = bucket.pop("name") # Remove name from options result = self.create_bucket(name=bucket_name, **bucket) results.append({ "bucket": bucket["id"], "status": "success", "result": result }) except Exception as e: self.logger.error(f"Error creating bucket {bucket['id']}: {str(e)}") results.append({ "bucket": bucket["id"], "status": "error", "error": str(e) }) return results except Exception as e: self.logger.error(f"Error initializing core buckets: {str(e)}") raise StorageError(str(e)) def create_user_bucket(self, user_id: str, username: str) -> Dict[str, Any]: """Create a storage bucket for a specific user.""" try: bucket_id = f"cc.users.admin.{username}" bucket_name = f"User Files - {username}" return self.create_bucket( id=bucket_id, name=bucket_name, public=False, owner=user_id, owner_id=username, file_size_limit=50 * 1024 * 1024, # 50MB allowed_mime_types=[ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] ) except Exception as e: self.logger.error(f"Error creating user bucket for {username}: {str(e)}") raise StorageError(str(e)) def create_school_buckets(self, school_id: str, school_name: str, admin_user_id: Optional[str] = None) -> Dict[str, Any]: """Create storage buckets for a school.""" try: owner_id = admin_user_id or self.admin_user_id if not owner_id: raise ValueError("Admin user ID is required for school bucket creation") school_buckets = [ { "id": f"cc.institutes.{school_id}.public", "name": f"{school_name} - Public Files", "public": True, "owner": owner_id, "owner_id": school_id, "file_size_limit": 50 * 1024 * 1024, # 50MB "allowed_mime_types": [ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] }, { "id": f"cc.institutes.{school_id}.private", "name": f"{school_name} - Private Files", "public": False, "owner": owner_id, "owner_id": school_id, "file_size_limit": 50 * 1024 * 1024, # 50MB "allowed_mime_types": [ 'image/*', 'video/*', 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.*', 'text/plain', 'text/csv', 'application/json' ] } ] results = {} for bucket in school_buckets: try: bucket_name = bucket.pop("name") # Remove name from options result = self.create_bucket(name=bucket_name, **bucket) results[bucket["id"]] = { "status": "success", "result": result } except Exception as e: self.logger.error(f"Error creating school bucket {bucket['id']}: {str(e)}") results[bucket["id"]] = { "status": "error", "error": str(e) } return results except Exception as e: self.logger.error(f"Error creating school buckets: {str(e)}") raise StorageError(str(e)) class StorageUser(StorageManager): """Storage user class for managing storage buckets with user role access.""" def __init__(self, user_id: Optional[str] = None): """Initialize StorageUser with user role client.""" super().__init__(SupabaseAnonClient()) self.user_id = user_id