import os import argparse import sys import subprocess import signal import atexit import time from modules.logger_tool import initialise_logger logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True) from fastapi import FastAPI, HTTPException import uvicorn import requests from urllib.parse import urlparse from typing import Dict, Any, Optional from modules.database.tools.neo4j_driver_tools import get_driver from run.setup import setup_cors from run.routers import register_routes from modules.task_processors import get_processor from modules.queue_system import ServiceType # FastAPI App Setup app = FastAPI() setup_cors(app) def _truthy_env(name: str) -> bool: return os.getenv(name, "").lower() in {"1", "true", "yes", "on"} def _runtime_environment() -> str: """Return the API runtime role used for dev/prod backing-service selection.""" start_mode = os.getenv("START_MODE", "prod").lower() if start_mode == "dev" or _truthy_env("BACKEND_DEV_MODE"): return "dev" return "prod" def _url_host(url: Optional[str]) -> Optional[str]: if not url: return None parsed = urlparse(url) return parsed.hostname def _runtime_identity() -> Dict[str, Any]: """Non-secret runtime identity for agents and smoke tests. This intentionally exposes only modes, labels, and URL hosts. It must not include API keys, passwords, bearer tokens, or full URLs that may embed credentials. """ return { "api_runtime_role": _runtime_environment(), "start_mode": os.getenv("START_MODE", "prod"), "app_environment": os.getenv("APP_ENV"), "environment": os.getenv("ENVIRONMENT"), "backend_dev_mode": _truthy_env("BACKEND_DEV_MODE"), "compose_project": os.getenv("COMPOSE_PROJECT_NAME") or os.getenv("CC_COMPOSE_PROJECT"), "compose_service": os.getenv("COMPOSE_SERVICE") or os.getenv("CC_COMPOSE_SERVICE"), "supabase_url_host": _url_host(os.getenv("SUPABASE_URL")), } # Health check endpoint @app.get("/health") async def health_check() -> Dict[str, Any]: """Health check endpoint that verifies all service dependencies""" runtime_identity = _runtime_identity() health_status = { "status": "healthy", "runtime": runtime_identity, "services": { "neo4j": {"status": "healthy", "message": "Connected"}, "supabase": { "status": "healthy", "message": "Connected", "url_host": runtime_identity["supabase_url_host"], }, "redis": {"status": "healthy", "message": "Connected"} } } try: # Check Neo4j driver = get_driver() if not driver: health_status["services"]["neo4j"] = { "status": "unhealthy", "message": "Failed to connect to Neo4j" } health_status["status"] = "unhealthy" except Exception as e: logger.warning(f"Neo4j health check failed: {e}") health_status["services"]["neo4j"] = { "status": "unhealthy", "message": "Error checking Neo4j" } health_status["status"] = "unhealthy" try: # Minimal check to confirm Supabase is responsive (e.g., pinging auth or storage endpoint) supabase_url = os.getenv("SUPABASE_URL") service_role_key = os.getenv("SERVICE_ROLE_KEY") response = requests.get( f"{supabase_url}/auth/v1/health", headers={"apikey": service_role_key}, timeout=5 ) if response.status_code != 200: health_status["services"]["supabase"] = { "status": "unhealthy", "message": f"Supabase Auth API returned status {response.status_code}" } health_status["status"] = "unhealthy" except Exception as e: logger.warning(f"Supabase health check failed: {e}") health_status["services"]["supabase"] = { "status": "unhealthy", "message": "Error checking Supabase Auth API" } health_status["status"] = "unhealthy" try: # Check Redis using new Redis manager from modules.redis_manager import get_redis_manager # Determine environment from explicit startup/runtime identity. environment = runtime_identity["api_runtime_role"] redis_manager = get_redis_manager(environment) # Get comprehensive health check redis_health = redis_manager.health_check() health_status["services"]["redis"] = { "status": redis_health['status'], "message": redis_health.get('error', f"Connected to {environment} environment (db={redis_health['database']})"), "environment": redis_health['environment'], "database": redis_health['database'], "queue_stats": redis_health.get('queue_stats', {}) } if redis_health['status'] != 'healthy': health_status["status"] = "unhealthy" except Exception as e: logger.warning(f"Redis health check failed: {e}") health_status["services"]["redis"] = { "status": "unhealthy", "message": "Error checking Redis" } health_status["status"] = "unhealthy" if health_status["status"] == "unhealthy": raise HTTPException(status_code=503, detail=health_status) return health_status # Register routes register_routes(app) # Start workers in the application process to avoid uvicorn reload issues @app.on_event("startup") async def _start_workers_event(): try: if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true': logger.info("AUTO_START_QUEUE_WORKERS=false, not starting in-process workers") return workers = int(os.getenv('QUEUE_WORKERS', '3')) services_csv = os.getenv('QUEUE_SERVICES', 'tika,docling,split_map,document_analysis,page_images') service_names = [s.strip().lower() for s in services_csv.split(',') if s.strip()] service_enums = [] for name in service_names: try: service_enums.append(ServiceType(name)) except Exception: pass if not service_enums: service_enums = list(ServiceType) processor = get_processor() started = [] for i in range(workers): wid = processor.start_worker(worker_id=f"app-worker-{i+1}", services=service_enums) started.append(wid) logger.info(f"In-process queue workers started: {started} for services {[s.value for s in service_enums]}") except Exception as e: logger.error(f"Failed to start in-process workers: {e}") @app.on_event("shutdown") async def _shutdown_workers_event(): try: processor = get_processor() processor.shutdown(timeout=30) except Exception as e: logger.warning(f"Error during workers shutdown: {e}") # Global subprocess handles (only for workers now) workers_process: Optional[subprocess.Popen] = None # Global Redis manager for cleanup redis_manager = None def start_queue_workers(): """Start queue workers as a subprocess (tied to API lifecycle).""" global workers_process if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true': logger.info("AUTO_START_QUEUE_WORKERS=false, not starting workers") return # If already started, skip if workers_process is not None and workers_process.poll() is None: logger.info("Queue workers already running") return services = os.getenv( 'QUEUE_SERVICES', 'tika,docling,split_map,document_analysis,page_images' ) workers = int(os.getenv('QUEUE_WORKERS', '3')) check_interval = os.getenv('QUEUE_CHECK_INTERVAL', '15') cmd = [ sys.executable, 'start_queue_workers.py', '--workers', str(workers), '--services', services, '--check-interval', check_interval, ] # Workers will auto-detect environment and use appropriate Redis database log_path = os.getenv('QUEUE_WORKERS_LOG', './queue_workers.log') try: log_file = open(log_path, 'a') logger.info(f"Starting queue workers ({workers}) for services [{services}] → {log_path}") workers_process = subprocess.Popen( cmd, stdout=log_file, stderr=log_file, preexec_fn=os.setsid if os.name != 'nt' else None ) except Exception as e: logger.error(f"Failed to start queue workers: {e}") def stop_queue_workers(): """Stop queue workers subprocess.""" global workers_process if workers_process is not None: try: logger.info("Stopping queue workers...") if os.name != 'nt': os.killpg(os.getpgid(workers_process.pid), signal.SIGTERM) else: workers_process.terminate() try: workers_process.wait(timeout=10) logger.info("Queue workers stopped gracefully") except subprocess.TimeoutExpired: logger.warning("Queue workers did not stop gracefully, forcing shutdown...") if os.name != 'nt': os.killpg(os.getpgid(workers_process.pid), signal.SIGKILL) else: workers_process.kill() workers_process.wait() logger.info("Queue workers force stopped") except Exception as e: logger.error(f"Error stopping queue workers: {e}") finally: workers_process = None def _install_signal_handlers(): def signal_handler(signum, frame): logger.info(f"Received signal {signum}, shutting down...") stop_queue_workers() # Gracefully shutdown Redis manager if it exists global redis_manager if redis_manager: redis_manager.shutdown() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def run_infrastructure_mode(): """Run infrastructure setup: Neo4j schema, calendar, and Supabase buckets""" logger.info("Running in infrastructure mode") logger.info("Starting infrastructure setup...") try: from run.initialization import initialize_infrastructure_mode initialize_infrastructure_mode() logger.info("Infrastructure setup completed successfully") return True except Exception as e: logger.error(f"Infrastructure setup failed: {str(e)}") return False def run_seed_mode(test: bool = False): """Run canonical environment seed.""" mode = "test" if test else "full" logger.info(f"Running canonical seed mode ({mode})") try: from run.initialization.seed_environment import seed import json result = seed(test=test) print(json.dumps(result, indent=2, default=str)) return bool(result.get('success')) except Exception as e: logger.error(f"Seed mode failed: {str(e)}") return False def run_gais_data_mode(): """Run GAIS data import""" logger.info("Running in GAIS data import mode") logger.info("Starting GAIS data import...") try: from run.initialization import initialize_gais_data_mode initialize_gais_data_mode() logger.info("GAIS data import completed successfully") return True except Exception as e: logger.error(f"GAIS data import failed: {str(e)}") return False # Old clear_dev_redis_queue function removed - now handled by Redis Manager def run_exam_corpus_mode(): """Seed the public exam-paper corpus from a manifest (optional, gated). Env controls: EXAM_CORPUS_MANIFEST - path to the corpus manifest (required to do anything) EXAM_CORPUS_DRY_RUN - 'true' to validate + report only EXAM_CORPUS_FORCE - 'true' to re-upload/overwrite existing objects EXAM_CORPUS_BOARD/_SPEC - filter to one exam_board_code / spec_code EXAM_CORPUS_USER_SUBSET - 'true' to also seed a user-side test subset EXAM_CORPUS_FIRST_SWEEP - 'true' to run the docling/auto-map first pass Skips gracefully (success) when no manifest is configured/present, so it is safe in a comma-mode list (e.g. INIT_MODE=infra,seed,exam-corpus) before papers exist. Buckets are NOT created here — infra mode (buckets.py) owns provisioning. """ logger.info("Running in exam-corpus seed mode") manifest = os.getenv("EXAM_CORPUS_MANIFEST") if not manifest or not os.path.exists(manifest): logger.warning( f"exam-corpus: no manifest at EXAM_CORPUS_MANIFEST={manifest!r}; skipping (nothing to seed yet)" ) return True try: from run.initialization.seed_exam_corpus import load rep = load( manifest, dry_run=_truthy_env("EXAM_CORPUS_DRY_RUN"), force=_truthy_env("EXAM_CORPUS_FORCE"), board_filter=os.getenv("EXAM_CORPUS_BOARD") or None, spec_filter=os.getenv("EXAM_CORPUS_SPEC") or None, user_subset=_truthy_env("EXAM_CORPUS_USER_SUBSET"), do_first_sweep=_truthy_env("EXAM_CORPUS_FIRST_SWEEP"), ) if rep.errors: logger.error(f"exam-corpus seed completed with {len(rep.errors)} error(s)") return False logger.info( f"exam-corpus seed ok: specs={rep.specs_upserted} papers={rep.papers_upserted} " f"uploaded={rep.files_uploaded}" ) return True except Exception as e: logger.error(f"exam-corpus seed failed: {e}") return False def run_development_mode(): """Run the server in development mode with auto-reload""" logger.info("Running in development mode") # Initialize Redis manager for development (auto-clears data) global redis_manager from modules.redis_manager import get_redis_manager redis_manager = get_redis_manager('dev') if not redis_manager.initialize_environment(): logger.error("Failed to initialize Redis for development") return False # Workers are started in app startup event logger.info("Starting uvicorn server with auto-reload...") # Install signal handlers for graceful shutdown _install_signal_handlers() try: uvicorn.run( "main:app", host="0.0.0.0", port=int(os.getenv('UVICORN_PORT', 8000)), log_level=os.getenv('LOG_LEVEL', 'info'), proxy_headers=True, timeout_keep_alive=10, reload=True ) finally: stop_queue_workers() if redis_manager: redis_manager.shutdown() def run_production_mode(): """Run the server in production mode""" logger.info("Running in production mode") # Initialize Redis manager for production (preserves data, recovers tasks) global redis_manager from modules.redis_manager import get_redis_manager redis_manager = get_redis_manager('prod') if not redis_manager.initialize_environment(): logger.error("Failed to initialize Redis for production") return False # Workers are started in app startup event logger.info("Starting uvicorn server in production mode...") # Install signal handlers for graceful shutdown _install_signal_handlers() try: uvicorn.run( "main:app", host="0.0.0.0", port=int(os.getenv('UVICORN_PORT', 8000)), log_level=os.getenv('LOG_LEVEL', 'info'), proxy_headers=True, timeout_keep_alive=10, workers=int(os.getenv('UVICORN_WORKERS', '1')) ) finally: stop_queue_workers() if redis_manager: redis_manager.shutdown() def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser( description="ClassroomCopilot API Server", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Startup modes: infra - Setup infrastructure (Neo4j schema, calendar, Supabase buckets) seed - Seed canonical full environment (20 school users) seed-test - Seed lightweight test environment (9 school users) gais-data - Import GAIS data (Edubase, etc.) dev - Run development server with auto-reload prod - Run production server (for Docker/containerized deployment) """ ) parser.add_argument( '--mode', '-m', choices=['infra', 'seed', 'seed-test', 'gais-data', 'exam-corpus', 'dev', 'prod'], default='dev', help='Startup mode (default: dev)' ) return parser.parse_args() if __name__ == "__main__": args = parse_arguments() # Set environment variable for backward compatibility if args.mode == 'dev': os.environ['BACKEND_DEV_MODE'] = 'true' else: os.environ['BACKEND_DEV_MODE'] = 'false' logger.info(f"Starting ClassroomCopilot API in {args.mode} mode") if args.mode == 'infra': # Run infrastructure setup success = run_infrastructure_mode() sys.exit(0 if success else 1) elif args.mode == 'seed': success = run_seed_mode(test=False) sys.exit(0 if success else 1) elif args.mode == 'seed-test': success = run_seed_mode(test=True) sys.exit(0 if success else 1) elif args.mode == 'gais-data': # Run GAIS data import success = run_gais_data_mode() sys.exit(0 if success else 1) elif args.mode == 'exam-corpus': # Seed the public exam-paper corpus from a manifest (gated; skips if none configured) success = run_exam_corpus_mode() sys.exit(0 if success else 1) elif args.mode == 'dev': # Run development server run_development_mode() elif args.mode == 'prod': # Run production server run_production_mode() else: logger.error(f"Invalid mode: {args.mode}") sys.exit(1)