204 lines
7.3 KiB
Python
204 lines
7.3 KiB
Python
import os
|
|
from typing import Dict, List, Optional
|
|
from supabase import create_client
|
|
from modules.logger_tool import initialise_logger
|
|
from modules.database.services.provisioning_service import ProvisioningService
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class AdminProfileBase(BaseModel):
|
|
email: str
|
|
display_name: Optional[str] = None
|
|
admin_role: Optional[str] = "admin"
|
|
is_super_admin: Optional[bool] = False
|
|
metadata: Optional[dict] = {}
|
|
|
|
|
|
class AdminService:
|
|
def __init__(self):
|
|
self.logger = initialise_logger(
|
|
__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True
|
|
)
|
|
|
|
# Initialize Supabase client with service role key
|
|
supabase_url = os.getenv("SUPABASE_URL")
|
|
service_role_key = os.getenv("SERVICE_ROLE_KEY")
|
|
|
|
self.supabase = create_client(supabase_url, service_role_key)
|
|
|
|
# Set headers for admin operations
|
|
self.supabase.headers = {
|
|
"apiKey": service_role_key,
|
|
"Authorization": f"Bearer {service_role_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
self.provisioner = ProvisioningService()
|
|
|
|
def get_admin_profile(self, admin_id: str) -> Optional[Dict]:
|
|
"""Get admin profile by ID"""
|
|
try:
|
|
self.logger.info(f"Getting admin profile for ID: {admin_id}")
|
|
result = (
|
|
self.supabase.table("admin_profiles")
|
|
.select("*")
|
|
.eq("id", admin_id)
|
|
.single()
|
|
.execute()
|
|
)
|
|
return result.data if result else None
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting admin profile: {str(e)}")
|
|
raise
|
|
|
|
def list_admins(self) -> List[Dict]:
|
|
"""List all admin profiles"""
|
|
try:
|
|
self.logger.info("Listing all admin profiles")
|
|
result = self.supabase.table("admin_profiles").select("*").execute()
|
|
return result.data if result else []
|
|
except Exception as e:
|
|
self.logger.error(f"Error listing admins: {str(e)}")
|
|
raise
|
|
|
|
def create_admin(self, admin_data: AdminProfileBase, current_admin: Dict) -> Dict:
|
|
"""Create a new admin profile"""
|
|
try:
|
|
# Verify super admin status
|
|
if not current_admin.get("is_super_admin"):
|
|
raise Exception("Only super admins can create new admins")
|
|
|
|
self.logger.info(
|
|
f"Creating new admin profile for email: {admin_data.email}"
|
|
)
|
|
|
|
# Create auth user first
|
|
auth_user = self.supabase.auth.admin.create_user(
|
|
{
|
|
"email": admin_data.email,
|
|
"email_confirm": True,
|
|
"user_metadata": {"is_admin": True},
|
|
}
|
|
)
|
|
|
|
if not auth_user:
|
|
raise Exception("Failed to create auth user")
|
|
|
|
# Create admin profile
|
|
profile_data = admin_data.dict()
|
|
profile_data["id"] = auth_user.id
|
|
|
|
result = (
|
|
self.supabase.table("admin_profiles").insert(profile_data).execute()
|
|
)
|
|
try:
|
|
self.provisioner.ensure_user(profile_data["id"])
|
|
except Exception as exc:
|
|
self.logger.warning(f"Provisioning admin user {profile_data['id']} failed: {exc}")
|
|
return result.data[0] if result else None
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error creating admin: {str(e)}")
|
|
raise
|
|
|
|
def update_admin(
|
|
self, admin_id: str, admin_data: AdminProfileBase, current_admin: Dict
|
|
) -> Dict:
|
|
"""Update an admin profile"""
|
|
try:
|
|
# Verify super admin status for certain operations
|
|
if admin_data.is_super_admin and not current_admin.get("is_super_admin"):
|
|
raise Exception("Only super admins can modify super admin status")
|
|
|
|
self.logger.info(f"Updating admin profile for ID: {admin_id}")
|
|
result = (
|
|
self.supabase.table("admin_profiles")
|
|
.update(admin_data.dict())
|
|
.eq("id", admin_id)
|
|
.execute()
|
|
)
|
|
return result.data[0] if result else None
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error updating admin: {str(e)}")
|
|
raise
|
|
|
|
def delete_admin(self, admin_id: str, current_admin: Dict) -> None:
|
|
"""Delete an admin profile"""
|
|
try:
|
|
# Verify super admin status
|
|
if not current_admin.get("is_super_admin"):
|
|
raise Exception("Only super admins can delete admins")
|
|
|
|
# Get admin profile to check if it's a super admin
|
|
admin_profile = self.get_admin_profile(admin_id)
|
|
if admin_profile and admin_profile.get("is_super_admin"):
|
|
raise Exception("Cannot delete super admin accounts")
|
|
|
|
self.logger.info(f"Deleting admin profile for ID: {admin_id}")
|
|
|
|
# Delete auth user
|
|
self.supabase.auth.admin.delete_user(admin_id)
|
|
|
|
# Delete admin profile
|
|
self.supabase.table("admin_profiles").delete().eq("id", admin_id).execute()
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error deleting admin: {str(e)}")
|
|
raise
|
|
|
|
def setup_super_admin(self, admin_data: dict) -> Dict:
|
|
"""Set up the initial super admin account"""
|
|
try:
|
|
self.logger.info(f"Setting up super admin for email: {admin_data['email']}")
|
|
|
|
# Check if any super admin exists
|
|
existing_super_admin = (
|
|
self.supabase.table("admin_profiles")
|
|
.select("*")
|
|
.eq("is_super_admin", True)
|
|
.execute()
|
|
)
|
|
if existing_super_admin.data:
|
|
raise Exception("Super admin already exists")
|
|
|
|
# Create the auth user first
|
|
auth_user = self.supabase.auth.admin.create_user(
|
|
{
|
|
"email": admin_data["email"],
|
|
"password": admin_data["password"],
|
|
"email_confirm": True,
|
|
"user_metadata": {"is_admin": True, "is_super_admin": True},
|
|
}
|
|
)
|
|
|
|
if not auth_user:
|
|
raise Exception("Failed to create auth user")
|
|
|
|
# Update user metadata
|
|
self.supabase.auth.admin.update_user_by_id(
|
|
auth_user.user.id,
|
|
{"user_metadata": {"is_admin": True, "is_super_admin": True}},
|
|
)
|
|
|
|
# Create super admin profile
|
|
profile_data = {
|
|
"id": auth_user.user.id,
|
|
"email": admin_data["email"],
|
|
"display_name": admin_data.get("display_name", "Super Admin"),
|
|
"admin_role": "super_admin",
|
|
"is_super_admin": True,
|
|
}
|
|
|
|
result = (
|
|
self.supabase.table("admin_profiles").insert(profile_data).execute()
|
|
)
|
|
try:
|
|
self.provisioner.ensure_user(profile_data["id"])
|
|
except Exception as exc:
|
|
self.logger.warning(f"Provisioning super admin {profile_data['id']} failed: {exc}")
|
|
return result.data[0] if result else None
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error setting up super admin: {str(e)}")
|
|
raise
|