import os import argparse import sys import subprocess import signal import atexit import time from modules.logger_tool import initialise_logger logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True) from fastapi import FastAPI, HTTPException import uvicorn import requests from typing import Dict, Any, Optional from modules.database.tools.neo4j_driver_tools import get_driver from run.setup import setup_cors from run.routers import register_routes from modules.task_processors import get_processor from modules.queue_system import ServiceType # FastAPI App Setup app = FastAPI() setup_cors(app) # Health check endpoint @app.get("/health") async def health_check() -> Dict[str, Any]: """Health check endpoint that verifies all service dependencies""" health_status = { "status": "healthy", "services": { "neo4j": {"status": "healthy", "message": "Connected"}, "supabase": {"status": "healthy", "message": "Connected"}, "redis": {"status": "healthy", "message": "Connected"} } } try: # Check Neo4j driver = get_driver() if not driver: health_status["services"]["neo4j"] = { "status": "unhealthy", "message": "Failed to connect to Neo4j" } health_status["status"] = "unhealthy" except Exception as e: health_status["services"]["neo4j"] = { "status": "unhealthy", "message": f"Error checking Neo4j: {str(e)}" } health_status["status"] = "unhealthy" try: # Minimal check to confirm Supabase is responsive (e.g., pinging auth or storage endpoint) supabase_url = os.getenv("SUPABASE_URL") service_role_key = os.getenv("SERVICE_ROLE_KEY") response = requests.get( f"{supabase_url}/auth/v1/health", headers={"apikey": service_role_key}, timeout=5 ) if response.status_code != 200: health_status["services"]["supabase"] = { "status": "unhealthy", "message": f"Supabase Auth API returned status {response.status_code}" } health_status["status"] = "unhealthy" except Exception as e: health_status["services"]["supabase"] = { "status": "unhealthy", "message": f"Error checking Supabase Auth API: {str(e)}" } health_status["status"] = "unhealthy" try: # Check Redis using new Redis manager from modules.redis_manager import get_redis_manager # Determine environment environment = 'dev' if os.getenv('BACKEND_DEV_MODE', 'true').lower() == 'true' else 'prod' redis_manager = get_redis_manager(environment) # Get comprehensive health check redis_health = redis_manager.health_check() health_status["services"]["redis"] = { "status": redis_health['status'], "message": redis_health.get('error', f"Connected to {environment} environment (db={redis_health['database']})"), "environment": redis_health['environment'], "database": redis_health['database'], "queue_stats": redis_health.get('queue_stats', {}) } if redis_health['status'] != 'healthy': health_status["status"] = "unhealthy" except Exception as e: health_status["services"]["redis"] = { "status": "unhealthy", "message": f"Error checking Redis: {str(e)}" } health_status["status"] = "unhealthy" if health_status["status"] == "unhealthy": raise HTTPException(status_code=503, detail=health_status) return health_status # Register routes register_routes(app) # Start workers in the application process to avoid uvicorn reload issues @app.on_event("startup") async def _start_workers_event(): try: if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true': logger.info("AUTO_START_QUEUE_WORKERS=false, not starting in-process workers") return workers = int(os.getenv('QUEUE_WORKERS', '3')) services_csv = os.getenv('QUEUE_SERVICES', 'tika,docling,split_map,document_analysis,page_images') service_names = [s.strip().lower() for s in services_csv.split(',') if s.strip()] service_enums = [] for name in service_names: try: service_enums.append(ServiceType(name)) except Exception: pass if not service_enums: service_enums = list(ServiceType) processor = get_processor() started = [] for i in range(workers): wid = processor.start_worker(worker_id=f"app-worker-{i+1}", services=service_enums) started.append(wid) logger.info(f"In-process queue workers started: {started} for services {[s.value for s in service_enums]}") except Exception as e: logger.error(f"Failed to start in-process workers: {e}") @app.on_event("shutdown") async def _shutdown_workers_event(): try: processor = get_processor() processor.shutdown(timeout=30) except Exception as e: logger.warning(f"Error during workers shutdown: {e}") # Global subprocess handles (only for workers now) workers_process: Optional[subprocess.Popen] = None # Global Redis manager for cleanup redis_manager = None def start_queue_workers(): """Start queue workers as a subprocess (tied to API lifecycle).""" global workers_process if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true': logger.info("AUTO_START_QUEUE_WORKERS=false, not starting workers") return # If already started, skip if workers_process is not None and workers_process.poll() is None: logger.info("Queue workers already running") return services = os.getenv( 'QUEUE_SERVICES', 'tika,docling,split_map,document_analysis,page_images' ) workers = int(os.getenv('QUEUE_WORKERS', '3')) check_interval = os.getenv('QUEUE_CHECK_INTERVAL', '15') cmd = [ sys.executable, 'start_queue_workers.py', '--workers', str(workers), '--services', services, '--check-interval', check_interval, ] # Workers will auto-detect environment and use appropriate Redis database log_path = os.getenv('QUEUE_WORKERS_LOG', './queue_workers.log') try: log_file = open(log_path, 'a') logger.info(f"Starting queue workers ({workers}) for services [{services}] → {log_path}") workers_process = subprocess.Popen( cmd, stdout=log_file, stderr=log_file, preexec_fn=os.setsid if os.name != 'nt' else None ) except Exception as e: logger.error(f"Failed to start queue workers: {e}") def stop_queue_workers(): """Stop queue workers subprocess.""" global workers_process if workers_process is not None: try: logger.info("Stopping queue workers...") if os.name != 'nt': os.killpg(os.getpgid(workers_process.pid), signal.SIGTERM) else: workers_process.terminate() try: workers_process.wait(timeout=10) logger.info("Queue workers stopped gracefully") except subprocess.TimeoutExpired: logger.warning("Queue workers did not stop gracefully, forcing shutdown...") if os.name != 'nt': os.killpg(os.getpgid(workers_process.pid), signal.SIGKILL) else: workers_process.kill() workers_process.wait() logger.info("Queue workers force stopped") except Exception as e: logger.error(f"Error stopping queue workers: {e}") finally: workers_process = None def _install_signal_handlers(): def signal_handler(signum, frame): logger.info(f"Received signal {signum}, shutting down...") stop_queue_workers() # Gracefully shutdown Redis manager if it exists global redis_manager if redis_manager: redis_manager.shutdown() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def run_infrastructure_mode(): """Run infrastructure setup: Neo4j schema, calendar, and Supabase buckets""" logger.info("Running in infrastructure mode") logger.info("Starting infrastructure setup...") try: from run.initialization import initialize_infrastructure_mode initialize_infrastructure_mode() logger.info("Infrastructure setup completed successfully") return True except Exception as e: logger.error(f"Infrastructure setup failed: {str(e)}") return False def run_demo_school_mode(): """Run demo school creation""" logger.info("Running in demo school mode") logger.info("Starting demo school creation...") try: from run.initialization import initialize_demo_school_mode initialize_demo_school_mode() logger.info("Demo school creation completed successfully") return True except Exception as e: logger.error(f"Demo school creation failed: {str(e)}") return False def run_demo_users_mode(): """Run demo users creation""" logger.info("Running in demo users mode") logger.info("Starting demo users creation...") try: from run.initialization import initialize_demo_users_mode initialize_demo_users_mode() logger.info("Demo users creation completed successfully") return True except Exception as e: logger.error(f"Demo users creation failed: {str(e)}") return False def run_gais_data_mode(): """Run GAIS data import""" logger.info("Running in GAIS data import mode") logger.info("Starting GAIS data import...") try: from run.initialization import initialize_gais_data_mode initialize_gais_data_mode() logger.info("GAIS data import completed successfully") return True except Exception as e: logger.error(f"GAIS data import failed: {str(e)}") return False # Old clear_dev_redis_queue function removed - now handled by Redis Manager def run_development_mode(): """Run the server in development mode with auto-reload""" logger.info("Running in development mode") # Initialize Redis manager for development (auto-clears data) global redis_manager from modules.redis_manager import get_redis_manager redis_manager = get_redis_manager('dev') if not redis_manager.initialize_environment(): logger.error("Failed to initialize Redis for development") return False # Workers are started in app startup event logger.info("Starting uvicorn server with auto-reload...") # Install signal handlers for graceful shutdown _install_signal_handlers() try: uvicorn.run( "main:app", host="0.0.0.0", port=int(os.getenv('UVICORN_PORT', 8000)), log_level=os.getenv('LOG_LEVEL', 'info'), proxy_headers=True, timeout_keep_alive=10, reload=True ) finally: stop_queue_workers() if redis_manager: redis_manager.shutdown() def run_production_mode(): """Run the server in production mode""" logger.info("Running in production mode") # Initialize Redis manager for production (preserves data, recovers tasks) global redis_manager from modules.redis_manager import get_redis_manager redis_manager = get_redis_manager('prod') if not redis_manager.initialize_environment(): logger.error("Failed to initialize Redis for production") return False # Workers are started in app startup event logger.info("Starting uvicorn server in production mode...") # Install signal handlers for graceful shutdown _install_signal_handlers() try: uvicorn.run( "main:app", host="0.0.0.0", port=int(os.getenv('UVICORN_PORT', 8000)), log_level=os.getenv('LOG_LEVEL', 'info'), proxy_headers=True, timeout_keep_alive=10, workers=int(os.getenv('UVICORN_WORKERS', '1')) ) finally: stop_queue_workers() if redis_manager: redis_manager.shutdown() def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser( description="ClassroomCopilot API Server", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Startup modes: infra - Setup infrastructure (Neo4j schema, calendar, Supabase buckets) demo-school - Create demo school (KevlarAI) demo-users - Create demo users gais-data - Import GAIS data (Edubase, etc.) dev - Run development server with auto-reload prod - Run production server (for Docker/containerized deployment) """ ) parser.add_argument( '--mode', '-m', choices=['infra', 'demo-school', 'demo-users', 'gais-data', 'dev', 'prod'], default='dev', help='Startup mode (default: dev)' ) return parser.parse_args() if __name__ == "__main__": args = parse_arguments() # Set environment variable for backward compatibility if args.mode == 'dev': os.environ['BACKEND_DEV_MODE'] = 'true' else: os.environ['BACKEND_DEV_MODE'] = 'false' logger.info(f"Starting ClassroomCopilot API in {args.mode} mode") if args.mode == 'infra': # Run infrastructure setup success = run_infrastructure_mode() sys.exit(0 if success else 1) elif args.mode == 'demo-school': # Run demo school creation success = run_demo_school_mode() sys.exit(0 if success else 1) elif args.mode == 'demo-users': # Run demo users creation success = run_demo_users_mode() sys.exit(0 if success else 1) elif args.mode == 'gais-data': # Run GAIS data import success = run_gais_data_mode() sys.exit(0 if success else 1) elif args.mode == 'dev': # Run development server run_development_mode() elif args.mode == 'prod': # Run production server run_production_mode() else: logger.error(f"Invalid mode: {args.mode}") sys.exit(1)