461 lines
16 KiB
Python
461 lines
16 KiB
Python
import os
|
|
import argparse
|
|
import sys
|
|
import subprocess
|
|
import signal
|
|
import atexit
|
|
import time
|
|
from modules.logger_tool import initialise_logger
|
|
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
|
|
from fastapi import FastAPI, HTTPException
|
|
import uvicorn
|
|
import requests
|
|
from urllib.parse import urlparse
|
|
from typing import Dict, Any, Optional
|
|
from modules.database.tools.neo4j_driver_tools import get_driver
|
|
|
|
from run.setup import setup_cors
|
|
from run.routers import register_routes
|
|
from modules.task_processors import get_processor
|
|
from modules.queue_system import ServiceType
|
|
|
|
# FastAPI App Setup
|
|
app = FastAPI()
|
|
setup_cors(app)
|
|
|
|
def _truthy_env(name: str) -> bool:
|
|
return os.getenv(name, "").lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def _runtime_environment() -> str:
|
|
"""Return the API runtime role used for dev/prod backing-service selection."""
|
|
start_mode = os.getenv("START_MODE", "prod").lower()
|
|
if start_mode == "dev" or _truthy_env("BACKEND_DEV_MODE"):
|
|
return "dev"
|
|
return "prod"
|
|
|
|
|
|
def _url_host(url: Optional[str]) -> Optional[str]:
|
|
if not url:
|
|
return None
|
|
parsed = urlparse(url)
|
|
return parsed.hostname
|
|
|
|
|
|
def _runtime_identity() -> Dict[str, Any]:
|
|
"""Non-secret runtime identity for agents and smoke tests.
|
|
|
|
This intentionally exposes only modes, labels, and URL hosts. It must not
|
|
include API keys, passwords, bearer tokens, or full URLs that may embed
|
|
credentials.
|
|
"""
|
|
return {
|
|
"api_runtime_role": _runtime_environment(),
|
|
"start_mode": os.getenv("START_MODE", "prod"),
|
|
"app_environment": os.getenv("APP_ENV"),
|
|
"environment": os.getenv("ENVIRONMENT"),
|
|
"backend_dev_mode": _truthy_env("BACKEND_DEV_MODE"),
|
|
"compose_project": os.getenv("COMPOSE_PROJECT_NAME") or os.getenv("CC_COMPOSE_PROJECT"),
|
|
"compose_service": os.getenv("COMPOSE_SERVICE") or os.getenv("CC_COMPOSE_SERVICE"),
|
|
"supabase_url_host": _url_host(os.getenv("SUPABASE_URL")),
|
|
}
|
|
|
|
|
|
# Health check endpoint
|
|
@app.get("/health")
|
|
async def health_check() -> Dict[str, Any]:
|
|
"""Health check endpoint that verifies all service dependencies"""
|
|
runtime_identity = _runtime_identity()
|
|
health_status = {
|
|
"status": "healthy",
|
|
"runtime": runtime_identity,
|
|
"services": {
|
|
"neo4j": {"status": "healthy", "message": "Connected"},
|
|
"supabase": {
|
|
"status": "healthy",
|
|
"message": "Connected",
|
|
"url_host": runtime_identity["supabase_url_host"],
|
|
},
|
|
"redis": {"status": "healthy", "message": "Connected"}
|
|
}
|
|
}
|
|
|
|
try:
|
|
# Check Neo4j
|
|
driver = get_driver()
|
|
if not driver:
|
|
health_status["services"]["neo4j"] = {
|
|
"status": "unhealthy",
|
|
"message": "Failed to connect to Neo4j"
|
|
}
|
|
health_status["status"] = "unhealthy"
|
|
except Exception as e:
|
|
logger.warning(f"Neo4j health check failed: {e}")
|
|
health_status["services"]["neo4j"] = {
|
|
"status": "unhealthy",
|
|
"message": "Error checking Neo4j"
|
|
}
|
|
health_status["status"] = "unhealthy"
|
|
|
|
try:
|
|
# Minimal check to confirm Supabase is responsive (e.g., pinging auth or storage endpoint)
|
|
supabase_url = os.getenv("SUPABASE_URL")
|
|
service_role_key = os.getenv("SERVICE_ROLE_KEY")
|
|
response = requests.get(
|
|
f"{supabase_url}/auth/v1/health",
|
|
headers={"apikey": service_role_key},
|
|
timeout=5
|
|
)
|
|
if response.status_code != 200:
|
|
health_status["services"]["supabase"] = {
|
|
"status": "unhealthy",
|
|
"message": f"Supabase Auth API returned status {response.status_code}"
|
|
}
|
|
health_status["status"] = "unhealthy"
|
|
except Exception as e:
|
|
logger.warning(f"Supabase health check failed: {e}")
|
|
health_status["services"]["supabase"] = {
|
|
"status": "unhealthy",
|
|
"message": "Error checking Supabase Auth API"
|
|
}
|
|
health_status["status"] = "unhealthy"
|
|
|
|
try:
|
|
# Check Redis using new Redis manager
|
|
from modules.redis_manager import get_redis_manager
|
|
|
|
# Determine environment from explicit startup/runtime identity.
|
|
environment = runtime_identity["api_runtime_role"]
|
|
redis_manager = get_redis_manager(environment)
|
|
|
|
# Get comprehensive health check
|
|
redis_health = redis_manager.health_check()
|
|
|
|
health_status["services"]["redis"] = {
|
|
"status": redis_health['status'],
|
|
"message": redis_health.get('error', f"Connected to {environment} environment (db={redis_health['database']})"),
|
|
"environment": redis_health['environment'],
|
|
"database": redis_health['database'],
|
|
"queue_stats": redis_health.get('queue_stats', {})
|
|
}
|
|
|
|
if redis_health['status'] != 'healthy':
|
|
health_status["status"] = "unhealthy"
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Redis health check failed: {e}")
|
|
health_status["services"]["redis"] = {
|
|
"status": "unhealthy",
|
|
"message": "Error checking Redis"
|
|
}
|
|
health_status["status"] = "unhealthy"
|
|
|
|
if health_status["status"] == "unhealthy":
|
|
raise HTTPException(status_code=503, detail=health_status)
|
|
|
|
return health_status
|
|
|
|
# Register routes
|
|
register_routes(app)
|
|
|
|
# Start workers in the application process to avoid uvicorn reload issues
|
|
@app.on_event("startup")
|
|
async def _start_workers_event():
|
|
try:
|
|
if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true':
|
|
logger.info("AUTO_START_QUEUE_WORKERS=false, not starting in-process workers")
|
|
return
|
|
workers = int(os.getenv('QUEUE_WORKERS', '3'))
|
|
services_csv = os.getenv('QUEUE_SERVICES', 'tika,docling,split_map,document_analysis,page_images')
|
|
service_names = [s.strip().lower() for s in services_csv.split(',') if s.strip()]
|
|
service_enums = []
|
|
for name in service_names:
|
|
try:
|
|
service_enums.append(ServiceType(name))
|
|
except Exception:
|
|
pass
|
|
if not service_enums:
|
|
service_enums = list(ServiceType)
|
|
processor = get_processor()
|
|
started = []
|
|
for i in range(workers):
|
|
wid = processor.start_worker(worker_id=f"app-worker-{i+1}", services=service_enums)
|
|
started.append(wid)
|
|
logger.info(f"In-process queue workers started: {started} for services {[s.value for s in service_enums]}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to start in-process workers: {e}")
|
|
|
|
@app.on_event("shutdown")
|
|
async def _shutdown_workers_event():
|
|
try:
|
|
processor = get_processor()
|
|
processor.shutdown(timeout=30)
|
|
except Exception as e:
|
|
logger.warning(f"Error during workers shutdown: {e}")
|
|
|
|
# Global subprocess handles (only for workers now)
|
|
workers_process: Optional[subprocess.Popen] = None
|
|
|
|
# Global Redis manager for cleanup
|
|
redis_manager = None
|
|
|
|
def start_queue_workers():
|
|
"""Start queue workers as a subprocess (tied to API lifecycle)."""
|
|
global workers_process
|
|
if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true':
|
|
logger.info("AUTO_START_QUEUE_WORKERS=false, not starting workers")
|
|
return
|
|
|
|
# If already started, skip
|
|
if workers_process is not None and workers_process.poll() is None:
|
|
logger.info("Queue workers already running")
|
|
return
|
|
|
|
services = os.getenv(
|
|
'QUEUE_SERVICES',
|
|
'tika,docling,split_map,document_analysis,page_images'
|
|
)
|
|
workers = int(os.getenv('QUEUE_WORKERS', '3'))
|
|
check_interval = os.getenv('QUEUE_CHECK_INTERVAL', '15')
|
|
|
|
cmd = [
|
|
sys.executable,
|
|
'start_queue_workers.py',
|
|
'--workers', str(workers),
|
|
'--services', services,
|
|
'--check-interval', check_interval,
|
|
]
|
|
# Workers will auto-detect environment and use appropriate Redis database
|
|
|
|
log_path = os.getenv('QUEUE_WORKERS_LOG', './queue_workers.log')
|
|
try:
|
|
log_file = open(log_path, 'a')
|
|
logger.info(f"Starting queue workers ({workers}) for services [{services}] → {log_path}")
|
|
workers_process = subprocess.Popen(
|
|
cmd,
|
|
stdout=log_file,
|
|
stderr=log_file,
|
|
preexec_fn=os.setsid if os.name != 'nt' else None
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to start queue workers: {e}")
|
|
|
|
def stop_queue_workers():
|
|
"""Stop queue workers subprocess."""
|
|
global workers_process
|
|
if workers_process is not None:
|
|
try:
|
|
logger.info("Stopping queue workers...")
|
|
if os.name != 'nt':
|
|
os.killpg(os.getpgid(workers_process.pid), signal.SIGTERM)
|
|
else:
|
|
workers_process.terminate()
|
|
try:
|
|
workers_process.wait(timeout=10)
|
|
logger.info("Queue workers stopped gracefully")
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning("Queue workers did not stop gracefully, forcing shutdown...")
|
|
if os.name != 'nt':
|
|
os.killpg(os.getpgid(workers_process.pid), signal.SIGKILL)
|
|
else:
|
|
workers_process.kill()
|
|
workers_process.wait()
|
|
logger.info("Queue workers force stopped")
|
|
except Exception as e:
|
|
logger.error(f"Error stopping queue workers: {e}")
|
|
finally:
|
|
workers_process = None
|
|
|
|
def _install_signal_handlers():
|
|
def signal_handler(signum, frame):
|
|
logger.info(f"Received signal {signum}, shutting down...")
|
|
stop_queue_workers()
|
|
# Gracefully shutdown Redis manager if it exists
|
|
global redis_manager
|
|
if redis_manager:
|
|
redis_manager.shutdown()
|
|
sys.exit(0)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
def run_infrastructure_mode():
|
|
"""Run infrastructure setup: Neo4j schema, calendar, and Supabase buckets"""
|
|
logger.info("Running in infrastructure mode")
|
|
logger.info("Starting infrastructure setup...")
|
|
|
|
try:
|
|
from run.initialization import initialize_infrastructure_mode
|
|
initialize_infrastructure_mode()
|
|
logger.info("Infrastructure setup completed successfully")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Infrastructure setup failed: {str(e)}")
|
|
return False
|
|
|
|
def run_seed_mode(test: bool = False):
|
|
"""Run canonical environment seed."""
|
|
mode = "test" if test else "full"
|
|
logger.info(f"Running canonical seed mode ({mode})")
|
|
try:
|
|
from run.initialization.seed_environment import seed
|
|
import json
|
|
result = seed(test=test)
|
|
print(json.dumps(result, indent=2, default=str))
|
|
return bool(result.get('success'))
|
|
except Exception as e:
|
|
logger.error(f"Seed mode failed: {str(e)}")
|
|
return False
|
|
|
|
|
|
def run_gais_data_mode():
|
|
"""Run GAIS data import"""
|
|
logger.info("Running in GAIS data import mode")
|
|
logger.info("Starting GAIS data import...")
|
|
|
|
try:
|
|
from run.initialization import initialize_gais_data_mode
|
|
initialize_gais_data_mode()
|
|
logger.info("GAIS data import completed successfully")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"GAIS data import failed: {str(e)}")
|
|
return False
|
|
|
|
# Old clear_dev_redis_queue function removed - now handled by Redis Manager
|
|
|
|
def run_development_mode():
|
|
"""Run the server in development mode with auto-reload"""
|
|
logger.info("Running in development mode")
|
|
|
|
# Initialize Redis manager for development (auto-clears data)
|
|
global redis_manager
|
|
from modules.redis_manager import get_redis_manager
|
|
redis_manager = get_redis_manager('dev')
|
|
|
|
if not redis_manager.initialize_environment():
|
|
logger.error("Failed to initialize Redis for development")
|
|
return False
|
|
|
|
# Workers are started in app startup event
|
|
|
|
logger.info("Starting uvicorn server with auto-reload...")
|
|
|
|
# Install signal handlers for graceful shutdown
|
|
_install_signal_handlers()
|
|
|
|
try:
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="0.0.0.0",
|
|
port=int(os.getenv('UVICORN_PORT', 8000)),
|
|
log_level=os.getenv('LOG_LEVEL', 'info'),
|
|
proxy_headers=True,
|
|
timeout_keep_alive=10,
|
|
reload=True
|
|
)
|
|
finally:
|
|
stop_queue_workers()
|
|
if redis_manager:
|
|
redis_manager.shutdown()
|
|
|
|
def run_production_mode():
|
|
"""Run the server in production mode"""
|
|
logger.info("Running in production mode")
|
|
|
|
# Initialize Redis manager for production (preserves data, recovers tasks)
|
|
global redis_manager
|
|
from modules.redis_manager import get_redis_manager
|
|
redis_manager = get_redis_manager('prod')
|
|
|
|
if not redis_manager.initialize_environment():
|
|
logger.error("Failed to initialize Redis for production")
|
|
return False
|
|
|
|
# Workers are started in app startup event
|
|
|
|
logger.info("Starting uvicorn server in production mode...")
|
|
|
|
# Install signal handlers for graceful shutdown
|
|
_install_signal_handlers()
|
|
|
|
try:
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="0.0.0.0",
|
|
port=int(os.getenv('UVICORN_PORT', 8000)),
|
|
log_level=os.getenv('LOG_LEVEL', 'info'),
|
|
proxy_headers=True,
|
|
timeout_keep_alive=10,
|
|
workers=int(os.getenv('UVICORN_WORKERS', '1'))
|
|
)
|
|
finally:
|
|
stop_queue_workers()
|
|
if redis_manager:
|
|
redis_manager.shutdown()
|
|
|
|
def parse_arguments():
|
|
"""Parse command line arguments"""
|
|
parser = argparse.ArgumentParser(
|
|
description="ClassroomCopilot API Server",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Startup modes:
|
|
infra - Setup infrastructure (Neo4j schema, calendar, Supabase buckets)
|
|
seed - Seed canonical full environment (20 school users)
|
|
seed-test - Seed lightweight test environment (9 school users)
|
|
gais-data - Import GAIS data (Edubase, etc.)
|
|
dev - Run development server with auto-reload
|
|
prod - Run production server (for Docker/containerized deployment)
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--mode', '-m',
|
|
choices=['infra', 'seed', 'seed-test', 'gais-data', 'dev', 'prod'],
|
|
default='dev',
|
|
help='Startup mode (default: dev)'
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_arguments()
|
|
|
|
# Set environment variable for backward compatibility
|
|
if args.mode == 'dev':
|
|
os.environ['BACKEND_DEV_MODE'] = 'true'
|
|
else:
|
|
os.environ['BACKEND_DEV_MODE'] = 'false'
|
|
|
|
logger.info(f"Starting ClassroomCopilot API in {args.mode} mode")
|
|
|
|
if args.mode == 'infra':
|
|
# Run infrastructure setup
|
|
success = run_infrastructure_mode()
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif args.mode == 'seed':
|
|
success = run_seed_mode(test=False)
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif args.mode == 'seed-test':
|
|
success = run_seed_mode(test=True)
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif args.mode == 'gais-data':
|
|
# Run GAIS data import
|
|
success = run_gais_data_mode()
|
|
sys.exit(0 if success else 1)
|
|
|
|
elif args.mode == 'dev':
|
|
# Run development server
|
|
run_development_mode()
|
|
|
|
elif args.mode == 'prod':
|
|
# Run production server
|
|
run_production_mode()
|
|
|
|
else:
|
|
logger.error(f"Invalid mode: {args.mode}")
|
|
sys.exit(1)
|