299 lines
13 KiB
Python

import os
from typing import Dict, List, Optional, Any, TypedDict
from .client import SupabaseServiceRoleClient, SupabaseAnonClient
from modules.logger_tool import initialise_logger
class CreateBucketOptions(TypedDict, total=False):
"""Options for bucket creation, matching Supabase API requirements"""
public: bool
file_size_limit: int
allowed_mime_types: List[str]
class StorageError(Exception):
"""Custom exception for storage-related errors"""
pass
class StorageManager:
"""Base storage manager class with common functionality"""
def __init__(self, client: SupabaseServiceRoleClient | SupabaseAnonClient):
self.client = client
self.logger = initialise_logger(__name__)
def check_bucket_exists(self, bucket_id: str) -> bool:
"""Check if a storage bucket exists"""
try:
self.logger.info(f"Checking if bucket {{bucket_id}} exists")
buckets = self.client.supabase.storage.list_buckets()
return any(bucket.name == bucket_id for bucket in buckets)
except Exception as e:
self.logger.error(f"Error checking bucket {{bucket_id}}: {{str(e)}}")
return False
def list_bucket_contents(self, bucket_id: str, path: str = "") -> Dict:
"""List contents of a bucket at specified path"""
try:
self.logger.info(f"Listing contents of bucket {{bucket_id}} at path {{path}}")
contents = self.client.supabase.storage.from_(bucket_id).list(path)
return {{
"folders": [item for item in contents if item.get("id", "").endswith("/")],
"files": [item for item in contents if not item.get("id", "").endswith("/")]
}}
except Exception as e:
self.logger.error(f"Error listing bucket contents: {{str(e)}}")
raise StorageError(str(e))
def upload_file(self, bucket_id: str, file_path: str, file_data: bytes, content_type: str, upsert: bool = True) -> Any:
"""Upload a file to a storage bucket"""
try:
self.logger.info(f"Uploading file to {{bucket_id}} at path {{file_path}}")
return self.client.supabase.storage.from_(bucket_id).upload(
path=file_path,
file=file_data,
file_options={{
"content-type": content_type,
"x-upsert": "true" if upsert else "false"
}}
)
except Exception as e:
self.logger.error(f"Error uploading file: {{str(e)}}")
raise StorageError(str(e))
def download_file(self, bucket_id: str, file_path: str) -> bytes:
"""Download a file from a storage bucket"""
try:
self.logger.info(f"Downloading file from {{bucket_id}} at path {{file_path}}")
return self.client.supabase.storage.from_(bucket_id).download(file_path)
except Exception as e:
self.logger.error(f"Error downloading file: {{str(e)}}")
raise StorageError(str(e))
def delete_file(self, bucket_id: str, file_path: str) -> None:
"""Delete a file from a storage bucket"""
try:
self.logger.info(f"Deleting file from {{bucket_id}} at path {{file_path}}")
self.client.supabase.storage.from_(bucket_id).remove([file_path])
except Exception as e:
self.logger.error(f"Error deleting file: {{str(e)}}")
raise StorageError(str(e))
def get_public_url(self, bucket_id: str, file_path: str) -> str:
"""Get public URL for a file"""
try:
self.logger.info(f"Getting public URL for file in {{bucket_id}} at path {{file_path}}")
return self.client.supabase.storage.from_(bucket_id).get_public_url(file_path)
except Exception as e:
self.logger.error(f"Error getting public URL: {{str(e)}}")
raise StorageError(str(e))
def create_signed_url(self, bucket_id: str, file_path: str, expires_in: int = 3600) -> Any:
"""Create a signed URL for temporary file access"""
try:
self.logger.info(f"Creating signed URL for file in {{bucket_id}} at path {{file_path}}")
return self.client.supabase.storage.from_(bucket_id).create_signed_url(file_path, expires_in)
except Exception as e:
self.logger.error(f"Error creating signed URL: {{str(e)}}")
raise StorageError(str(e))
class StorageAdmin(StorageManager):
"""Storage admin class for managing storage buckets with service role access."""
def __init__(self, admin_user_id: Optional[str] = None):
"""Initialize StorageAdmin with service role client."""
super().__init__(SupabaseServiceRoleClient())
self.admin_user_id = admin_user_id
def create_bucket(
self,
id: str,
name: Optional[str] = None,
public: bool = False,
file_size_limit: Optional[int] = None,
allowed_mime_types: Optional[List[str]] = None,
owner: Optional[str] = None,
owner_id: Optional[str] = None
) -> Dict[str, Any]:
"""Create a new storage bucket with supported parameters."""
try:
self.logger.info(f"Creating bucket {{id}} with name {{name}}")
options: Optional[CreateBucketOptions] = {{}}
if public:
options["public"] = public
if file_size_limit is not None:
options["file_size_limit"] = file_size_limit
if allowed_mime_types is not None:
options["allowed_mime_types"] = allowed_mime_types
bucket = self.client.supabase.storage.create_bucket(
str(id),
options=options if options else None
)
return bucket
except Exception as e:
self.logger.error(f"Error creating bucket {{id}}: {{str(e)}}")
raise StorageError(str(e))
def initialize_core_buckets(self, admin_user_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Initialize core storage buckets for the application."""
try:
owner_id = admin_user_id or self.admin_user_id
if not owner_id:
raise ValueError("Admin user ID is required for bucket initialization")
core_buckets = [
{{
"id": "cc.users",
"name": "CC Users",
"public": False,
"owner": owner_id,
"owner_id": "superadmin",
"file_size_limit": 50 * 1024 * 1024,
"allowed_mime_types": [
'image/*', 'video/*', 'application/pdf',
'application/msword', 'application/vnd.openxmlformats-officedocument.*',
'text/plain', 'text/csv', 'application/json'
]
}},
{{
"id": "cc.institutes",
"name": "CC Institutes",
"public": False,
"owner": owner_id,
"owner_id": "superadmin",
"file_size_limit": 50 * 1024 * 1024,
"allowed_mime_types": [
'image/*', 'video/*', 'application/pdf',
'application/msword', 'application/vnd.openxmlformats-officedocument.*',
'text/plain', 'text/csv', 'application/json'
]
}}
]
results = []
for bucket in core_buckets:
try:
bucket_name = bucket.pop("name")
result = self.create_bucket(name=bucket_name, **bucket)
results.append({{
"bucket": bucket["id"],
"status": "success",
"result": result
}})
except Exception as e:
self.logger.error(f"Error creating bucket {{bucket['id']}}: {{str(e)}}")
results.append({{
"bucket": bucket["id"],
"status": "error",
"error": str(e)
}})
return results
except Exception as e:
self.logger.error(f"Error initializing core buckets: {{str(e)}}")
raise StorageError(str(e))
def create_user_bucket(self, user_id: str, username: str) -> Dict[str, Any]:
"""Create a storage bucket for a specific user."""
try:
bucket_id = f"cc.users.admin.{{username}}"
bucket_name = f"User Files - {{username}}"
return self.create_bucket(
id=bucket_id,
name=bucket_name,
public=False,
owner=user_id,
owner_id=username,
file_size_limit=50 * 1024 * 1024,
allowed_mime_types=[
'image/*', 'video/*', 'application/pdf',
'application/msword', 'application/vnd.openxmlformats-officedocument.*',
'text/plain', 'text/csv', 'application/json'
]
)
except Exception as e:
self.logger.error(f"Error creating user bucket for {{username}}: {{str(e)}}")
raise StorageError(str(e))
def create_school_buckets(self, school_id: str, school_name: str, admin_user_id: Optional[str] = None) -> Dict[str, Any]:
"""Create storage buckets for a school."""
try:
owner_id = admin_user_id or self.admin_user_id
if not owner_id:
raise ValueError("Admin user ID is required for school bucket creation")
school_buckets = [
{{
"id": f"cc.institutes.{{school_id}}.public",
"name": f"{{school_name}} - Public Files",
"public": True,
"owner": owner_id,
"owner_id": school_id,
"file_size_limit": 50 * 1024 * 1024,
"allowed_mime_types": [
'image/*', 'video/*', 'application/pdf',
'application/msword', 'application/vnd.openxmlformats-officedocument.*',
'text/plain', 'text/csv', 'application/json'
]
}},
{{
"id": f"cc.institutes.{{school_id}}.private",
"name": f"{{school_name}} - Private Files",
"public": False,
"owner": owner_id,
"owner_id": school_id,
"file_size_limit": 50 * 1024 * 1024,
"allowed_mime_types": [
'image/*', 'video/*', 'application/pdf',
'application/msword', 'application/vnd.openxmlformats-officedocument.*',
'text/plain', 'text/csv', 'application/json'
]
}}
]
results = {{}}
for bucket in school_buckets:
try:
bucket_name = bucket.pop("name")
result = self.create_bucket(name=bucket_name, **bucket)
results[bucket["id"]] = {{
"status": "success",
"result": result
}}
except Exception as e:
self.logger.error(f"Error creating school bucket {{bucket['id']}}: {{str(e)}}")
results[bucket["id"]] = {{
"status": "error",
"error": str(e)
}}
return results
except Exception as e:
self.logger.error(f"Error creating school buckets: {{str(e)}}")
raise StorageError(str(e))
class StorageUser(StorageManager):
"""Storage user class for managing storage with per-user RLS enforcement.
Requires a user access token to enforce Row Level Security policies.
Without a token, requests use the anon key which does NOT enforce per-user RLS.
"""
def __init__(self, user_id: Optional[str] = None, access_token: Optional[str] = None):
"""Initialize StorageUser with user role client.
Args:
user_id: The user's ID (for logging/context)
access_token: User's JWT access token for per-user RLS enforcement
"""
self.user_id = user_id
# Pass access_token to enable per-user RLS via auth.uid() in JWT
client = SupabaseAnonClient.for_user(access_token) if access_token else SupabaseAnonClient()
super().__init__(client)