import os import argparse import sys import subprocess import signal import atexit import time from modules.logger_tool import initialise_logger logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True) from fastapi import FastAPI, HTTPException import uvicorn import requests from urllib.parse import urlparse from typing import Dict, Any, Optional from modules.database.tools.neo4j_driver_tools import get_driver from run.setup import setup_cors from run.routers import register_routes from modules.task_processors import get_processor from modules.queue_system import ServiceType # FastAPI App Setup app = FastAPI() setup_cors(app) def _truthy_env(name: str) -> bool: return os.getenv(name, "").lower() in {"1", "true", "yes", "on"} def _runtime_environment() -> str: """Return the API runtime role used for dev/prod backing-service selection.""" start_mode = os.getenv("START_MODE", "prod").lower() if start_mode == "dev" or _truthy_env("BACKEND_DEV_MODE"): return "dev" return "prod" def _url_host(url: Optional[str]) -> Optional[str]: if not url: return None parsed = urlparse(url) return parsed.hostname def _runtime_identity() -> Dict[str, Any]: """Non-secret runtime identity for agents and smoke tests. This intentionally exposes only modes, labels, and URL hosts. It must not include API keys, passwords, bearer tokens, or full URLs that may embed credentials. """ return { "api_runtime_role": _runtime_environment(), "start_mode": os.getenv("START_MODE", "prod"), "app_environment": os.getenv("APP_ENV"), "environment": os.getenv("ENVIRONMENT"), "backend_dev_mode": _truthy_env("BACKEND_DEV_MODE"), "compose_project": os.getenv("COMPOSE_PROJECT_NAME") or os.getenv("CC_COMPOSE_PROJECT"), "compose_service": os.getenv("COMPOSE_SERVICE") or os.getenv("CC_COMPOSE_SERVICE"), "supabase_url_host": _url_host(os.getenv("SUPABASE_URL")), } # Health check endpoint @app.get("/health") async def health_check() -> Dict[str, Any]: """Health check endpoint that verifies all service dependencies""" runtime_identity = _runtime_identity() health_status = { "status": "healthy", "runtime": runtime_identity, "services": { "neo4j": {"status": "healthy", "message": "Connected"}, "supabase": { "status": "healthy", "message": "Connected", "url_host": runtime_identity["supabase_url_host"], }, "redis": {"status": "healthy", "message": "Connected"} } } try: # Check Neo4j driver = get_driver() if not driver: health_status["services"]["neo4j"] = { "status": "unhealthy", "message": "Failed to connect to Neo4j" } health_status["status"] = "unhealthy" except Exception as e: logger.warning(f"Neo4j health check failed: {e}") health_status["services"]["neo4j"] = { "status": "unhealthy", "message": "Error checking Neo4j" } health_status["status"] = "unhealthy" try: # Minimal check to confirm Supabase is responsive (e.g., pinging auth or storage endpoint) supabase_url = os.getenv("SUPABASE_URL") service_role_key = os.getenv("SERVICE_ROLE_KEY") response = requests.get( f"{supabase_url}/auth/v1/health", headers={"apikey": service_role_key}, timeout=5 ) if response.status_code != 200: health_status["services"]["supabase"] = { "status": "unhealthy", "message": f"Supabase Auth API returned status {response.status_code}" } health_status["status"] = "unhealthy" except Exception as e: logger.warning(f"Supabase health check failed: {e}") health_status["services"]["supabase"] = { "status": "unhealthy", "message": "Error checking Supabase Auth API" } health_status["status"] = "unhealthy" try: # Check Redis using new Redis manager from modules.redis_manager import get_redis_manager # Determine environment from explicit startup/runtime identity. environment = runtime_identity["api_runtime_role"] redis_manager = get_redis_manager(environment) # Get comprehensive health check redis_health = redis_manager.health_check() health_status["services"]["redis"] = { "status": redis_health['status'], "message": redis_health.get('error', f"Connected to {environment} environment (db={redis_health['database']})"), "environment": redis_health['environment'], "database": redis_health['database'], "queue_stats": redis_health.get('queue_stats', {}) } if redis_health['status'] != 'healthy': health_status["status"] = "unhealthy" except Exception as e: logger.warning(f"Redis health check failed: {e}") health_status["services"]["redis"] = { "status": "unhealthy", "message": "Error checking Redis" } health_status["status"] = "unhealthy" if health_status["status"] == "unhealthy": raise HTTPException(status_code=503, detail=health_status) return health_status # Register routes register_routes(app) # Start workers in the application process to avoid uvicorn reload issues @app.on_event("startup") async def _start_workers_event(): try: if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true': logger.info("AUTO_START_QUEUE_WORKERS=false, not starting in-process workers") return workers = int(os.getenv('QUEUE_WORKERS', '3')) services_csv = os.getenv('QUEUE_SERVICES', 'tika,docling,split_map,document_analysis,page_images') service_names = [s.strip().lower() for s in services_csv.split(',') if s.strip()] service_enums = [] for name in service_names: try: service_enums.append(ServiceType(name)) except Exception: pass if not service_enums: service_enums = list(ServiceType) processor = get_processor() started = [] for i in range(workers): wid = processor.start_worker(worker_id=f"app-worker-{i+1}", services=service_enums) started.append(wid) logger.info(f"In-process queue workers started: {started} for services {[s.value for s in service_enums]}") except Exception as e: logger.error(f"Failed to start in-process workers: {e}") @app.on_event("shutdown") async def _shutdown_workers_event(): try: processor = get_processor() processor.shutdown(timeout=30) except Exception as e: logger.warning(f"Error during workers shutdown: {e}") # Global subprocess handles (only for workers now) workers_process: Optional[subprocess.Popen] = None # Global Redis manager for cleanup redis_manager = None def start_queue_workers(): """Start queue workers as a subprocess (tied to API lifecycle).""" global workers_process if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true': logger.info("AUTO_START_QUEUE_WORKERS=false, not starting workers") return # If already started, skip if workers_process is not None and workers_process.poll() is None: logger.info("Queue workers already running") return services = os.getenv( 'QUEUE_SERVICES', 'tika,docling,split_map,document_analysis,page_images' ) workers = int(os.getenv('QUEUE_WORKERS', '3')) check_interval = os.getenv('QUEUE_CHECK_INTERVAL', '15') cmd = [ sys.executable, 'start_queue_workers.py', '--workers', str(workers), '--services', services, '--check-interval', check_interval, ] # Workers will auto-detect environment and use appropriate Redis database log_path = os.getenv('QUEUE_WORKERS_LOG', './queue_workers.log') try: log_file = open(log_path, 'a') logger.info(f"Starting queue workers ({workers}) for services [{services}] → {log_path}") workers_process = subprocess.Popen( cmd, stdout=log_file, stderr=log_file, preexec_fn=os.setsid if os.name != 'nt' else None ) except Exception as e: logger.error(f"Failed to start queue workers: {e}") def stop_queue_workers(): """Stop queue workers subprocess.""" global workers_process if workers_process is not None: try: logger.info("Stopping queue workers...") if os.name != 'nt': os.killpg(os.getpgid(workers_process.pid), signal.SIGTERM) else: workers_process.terminate() try: workers_process.wait(timeout=10) logger.info("Queue workers stopped gracefully") except subprocess.TimeoutExpired: logger.warning("Queue workers did not stop gracefully, forcing shutdown...") if os.name != 'nt': os.killpg(os.getpgid(workers_process.pid), signal.SIGKILL) else: workers_process.kill() workers_process.wait() logger.info("Queue workers force stopped") except Exception as e: logger.error(f"Error stopping queue workers: {e}") finally: workers_process = None def _install_signal_handlers(): def signal_handler(signum, frame): logger.info(f"Received signal {signum}, shutting down...") stop_queue_workers() # Gracefully shutdown Redis manager if it exists global redis_manager if redis_manager: redis_manager.shutdown() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def run_infrastructure_mode(): """Run infrastructure setup: Neo4j schema, calendar, and Supabase buckets""" logger.info("Running in infrastructure mode") logger.info("Starting infrastructure setup...") try: from run.initialization import initialize_infrastructure_mode initialize_infrastructure_mode() logger.info("Infrastructure setup completed successfully") return True except Exception as e: logger.error(f"Infrastructure setup failed: {str(e)}") return False def run_demo_school_mode(): """Run demo school creation""" logger.info("Running in demo school mode") logger.info("Starting demo school creation...") try: from run.initialization import initialize_demo_school_mode initialize_demo_school_mode() logger.info("Demo school creation completed successfully") return True except Exception as e: logger.error(f"Demo school creation failed: {str(e)}") return False def run_demo_users_mode(): """Run demo users creation""" logger.info("Running in demo users mode") logger.info("Starting demo users creation...") try: from run.initialization import initialize_demo_users_mode initialize_demo_users_mode() logger.info("Demo users creation completed successfully") return True except Exception as e: logger.error(f"Demo users creation failed: {str(e)}") return False def run_gais_data_mode(): """Run GAIS data import""" logger.info("Running in GAIS data import mode") logger.info("Starting GAIS data import...") try: from run.initialization import initialize_gais_data_mode initialize_gais_data_mode() logger.info("GAIS data import completed successfully") return True except Exception as e: logger.error(f"GAIS data import failed: {str(e)}") return False # Old clear_dev_redis_queue function removed - now handled by Redis Manager def run_development_mode(): """Run the server in development mode with auto-reload""" logger.info("Running in development mode") # Initialize Redis manager for development (auto-clears data) global redis_manager from modules.redis_manager import get_redis_manager redis_manager = get_redis_manager('dev') if not redis_manager.initialize_environment(): logger.error("Failed to initialize Redis for development") return False # Workers are started in app startup event logger.info("Starting uvicorn server with auto-reload...") # Install signal handlers for graceful shutdown _install_signal_handlers() try: uvicorn.run( "main:app", host="0.0.0.0", port=int(os.getenv('UVICORN_PORT', 8000)), log_level=os.getenv('LOG_LEVEL', 'info'), proxy_headers=True, timeout_keep_alive=10, reload=True ) finally: stop_queue_workers() if redis_manager: redis_manager.shutdown() def run_production_mode(): """Run the server in production mode""" logger.info("Running in production mode") # Initialize Redis manager for production (preserves data, recovers tasks) global redis_manager from modules.redis_manager import get_redis_manager redis_manager = get_redis_manager('prod') if not redis_manager.initialize_environment(): logger.error("Failed to initialize Redis for production") return False # Workers are started in app startup event logger.info("Starting uvicorn server in production mode...") # Install signal handlers for graceful shutdown _install_signal_handlers() try: uvicorn.run( "main:app", host="0.0.0.0", port=int(os.getenv('UVICORN_PORT', 8000)), log_level=os.getenv('LOG_LEVEL', 'info'), proxy_headers=True, timeout_keep_alive=10, workers=int(os.getenv('UVICORN_WORKERS', '1')) ) finally: stop_queue_workers() if redis_manager: redis_manager.shutdown() def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser( description="ClassroomCopilot API Server", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Startup modes: infra - Setup infrastructure (Neo4j schema, calendar, Supabase buckets) demo-school - Create demo school (KevlarAI) demo-users - Create demo users seed-test - Seed full test environment (2 schools, all test users) gais-data - Import GAIS data (Edubase, etc.) dev - Run development server with auto-reload prod - Run production server (for Docker/containerized deployment) """ ) parser.add_argument( '--mode', '-m', choices=['infra', 'demo-school', 'demo-users', 'seed-test', 'gais-data', 'dev', 'prod'], default='dev', help='Startup mode (default: dev)' ) return parser.parse_args() if __name__ == "__main__": args = parse_arguments() # Set environment variable for backward compatibility if args.mode == 'dev': os.environ['BACKEND_DEV_MODE'] = 'true' else: os.environ['BACKEND_DEV_MODE'] = 'false' logger.info(f"Starting ClassroomCopilot API in {args.mode} mode") if args.mode == 'infra': # Run infrastructure setup success = run_infrastructure_mode() sys.exit(0 if success else 1) elif args.mode == 'demo-school': # Run demo school creation success = run_demo_school_mode() sys.exit(0 if success else 1) elif args.mode == 'demo-users': # Run demo users creation success = run_demo_users_mode() sys.exit(0 if success else 1) elif args.mode == 'seed-test': from run.initialization.seed_test_environment import seed_test_environment import json result = seed_test_environment() print(json.dumps(result, indent=2)) sys.exit(0 if result.get('success') else 1) elif args.mode == 'gais-data': # Run GAIS data import success = run_gais_data_mode() sys.exit(0 if success else 1) elif args.mode == 'dev': # Run development server run_development_mode() elif args.mode == 'prod': # Run production server run_production_mode() else: logger.error(f"Invalid mode: {args.mode}") sys.exit(1)