99 lines
4.5 KiB
Python
99 lines
4.5 KiB
Python
import os
|
|
from typing import Dict, Optional, Any, TypedDict, List
|
|
from supabase import create_client, Client
|
|
from supabase.lib.client_options import SyncClientOptions
|
|
from supabase_auth import SyncMemoryStorage
|
|
from modules.logger_tool import initialise_logger
|
|
|
|
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
|
|
|
|
class CreateBucketOptions(TypedDict, total=False):
|
|
"""Options for bucket creation, matching Supabase API requirements"""
|
|
public: bool
|
|
file_size_limit: int
|
|
allowed_mime_types: List[str]
|
|
name: str
|
|
|
|
def _create_base_client(url: str, key: str, access_token: Optional[str] = None, options: Optional[Dict[str, Any]] = None) -> Client:
|
|
"""Create a base Supabase client with given configuration.
|
|
|
|
If access_token is provided, it is used as the Authorization header (for per-user RLS).
|
|
Otherwise, the API key is used (service role bypasses RLS, anon key does not).
|
|
"""
|
|
# If an access token is provided, use it for Authorization (enables per-user RLS)
|
|
# Otherwise fall back to the API key
|
|
auth_header = f"Bearer {access_token}" if access_token else f"Bearer {key}"
|
|
|
|
client_options = SyncClientOptions(
|
|
schema="public",
|
|
storage=SyncMemoryStorage(),
|
|
headers={
|
|
"Authorization": auth_header
|
|
}
|
|
)
|
|
return create_client(url, key, options=client_options)
|
|
|
|
class SupabaseServiceRoleClient:
|
|
"""Supabase client for making authenticated requests using the service role key.
|
|
|
|
NOTE: Service role bypasses all RLS policies. Use only for admin operations
|
|
where you need to read/write any row regardless of ownership.
|
|
"""
|
|
|
|
def __init__(self, url: Optional[str] = None, service_role_key: Optional[str] = None):
|
|
"""Initialize the Supabase client with URL and service role key"""
|
|
self.url = url or os.environ.get("SUPABASE_URL", "http://localhost:8000")
|
|
self.service_role_key = service_role_key or os.environ.get("SERVICE_ROLE_KEY")
|
|
|
|
if not self.url or not self.service_role_key:
|
|
raise ValueError("SUPABASE_URL and SERVICE_ROLE_KEY must be provided")
|
|
|
|
# Initialize Supabase client with service role key (bypasses RLS)
|
|
self.supabase = _create_base_client(self.url, self.service_role_key)
|
|
|
|
def create_bucket(self, id: str, options: Optional[CreateBucketOptions] = None) -> Dict[str, Any]:
|
|
"""Create a storage bucket with the given ID and options"""
|
|
if options is None:
|
|
options = CreateBucketOptions()
|
|
if 'name' not in options:
|
|
options['name'] = id # Use ID as default name if not provided
|
|
return self.supabase.storage.create_bucket(id, options=options)
|
|
|
|
class SupabaseAnonClient:
|
|
"""Supabase client for making authenticated requests using the anon key.
|
|
|
|
When initialized with an access_token, per-user RLS policies are enforced
|
|
via auth.uid() in the JWT. Without an access_token, requests use the anon key
|
|
which does NOT enforce per-user RLS (only bucket-level storage rules apply).
|
|
"""
|
|
|
|
def __init__(self, url: Optional[str] = None, anon_key: Optional[str] = None, access_token: Optional[str] = None):
|
|
"""Initialize the Supabase client with URL and anon key.
|
|
|
|
Args:
|
|
url: Supabase URL
|
|
anon_key: Anon API key (fallback if no access_token)
|
|
access_token: User's JWT access token for per-user RLS enforcement
|
|
"""
|
|
self.url = url or os.environ.get("SUPABASE_URL", "http://localhost:8000")
|
|
self.anon_key = anon_key or os.environ.get("ANON_KEY")
|
|
self.access_token = access_token
|
|
|
|
if not self.url or not self.anon_key:
|
|
raise ValueError("SUPABASE_URL and ANON_KEY must be provided")
|
|
|
|
# Initialize Supabase client with anon key and optional access token for RLS
|
|
self.supabase = _create_base_client(self.url, self.anon_key, access_token=access_token)
|
|
|
|
def create_bucket(self, id: str, options: Optional[CreateBucketOptions] = None) -> Dict[str, Any]:
|
|
"""Create a storage bucket with the given ID and options"""
|
|
return self.supabase.storage.create_bucket(id, options=options)
|
|
|
|
@classmethod
|
|
def for_user(cls, access_token: str) -> 'SupabaseAnonClient':
|
|
"""Create a client instance for a specific user using their access token.
|
|
|
|
This enables per-user RLS enforcement via auth.uid() in the JWT.
|
|
"""
|
|
return cls(access_token=access_token)
|