api/main.py
CC Worker d8cf3bbc62 feat(seed): wire exam-corpus mode into the init entrypoint (gated)
Add 'exam-corpus' INIT_MODE: docker-entrypoint.sh case -> main.py --mode
exam-corpus -> run_exam_corpus_mode() -> seed_exam_corpus.load(). Driven by
EXAM_CORPUS_MANIFEST (+ DRY_RUN/FORCE/BOARD/SPEC/USER_SUBSET/FIRST_SWEEP env).
Skips gracefully (success) when no manifest is configured, so it is safe in a
comma list like INIT_MODE=infra,seed,exam-corpus before papers are gathered.
Bucket provisioning stays in infra mode.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 22:26:58 +00:00

512 lines
18 KiB
Python

import os
import argparse
import sys
import subprocess
import signal
import atexit
import time
from modules.logger_tool import initialise_logger
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
from fastapi import FastAPI, HTTPException
import uvicorn
import requests
from urllib.parse import urlparse
from typing import Dict, Any, Optional
from modules.database.tools.neo4j_driver_tools import get_driver
from run.setup import setup_cors
from run.routers import register_routes
from modules.task_processors import get_processor
from modules.queue_system import ServiceType
# FastAPI App Setup
app = FastAPI()
setup_cors(app)
def _truthy_env(name: str) -> bool:
return os.getenv(name, "").lower() in {"1", "true", "yes", "on"}
def _runtime_environment() -> str:
"""Return the API runtime role used for dev/prod backing-service selection."""
start_mode = os.getenv("START_MODE", "prod").lower()
if start_mode == "dev" or _truthy_env("BACKEND_DEV_MODE"):
return "dev"
return "prod"
def _url_host(url: Optional[str]) -> Optional[str]:
if not url:
return None
parsed = urlparse(url)
return parsed.hostname
def _runtime_identity() -> Dict[str, Any]:
"""Non-secret runtime identity for agents and smoke tests.
This intentionally exposes only modes, labels, and URL hosts. It must not
include API keys, passwords, bearer tokens, or full URLs that may embed
credentials.
"""
return {
"api_runtime_role": _runtime_environment(),
"start_mode": os.getenv("START_MODE", "prod"),
"app_environment": os.getenv("APP_ENV"),
"environment": os.getenv("ENVIRONMENT"),
"backend_dev_mode": _truthy_env("BACKEND_DEV_MODE"),
"compose_project": os.getenv("COMPOSE_PROJECT_NAME") or os.getenv("CC_COMPOSE_PROJECT"),
"compose_service": os.getenv("COMPOSE_SERVICE") or os.getenv("CC_COMPOSE_SERVICE"),
"supabase_url_host": _url_host(os.getenv("SUPABASE_URL")),
}
# Health check endpoint
@app.get("/health")
async def health_check() -> Dict[str, Any]:
"""Health check endpoint that verifies all service dependencies"""
runtime_identity = _runtime_identity()
health_status = {
"status": "healthy",
"runtime": runtime_identity,
"services": {
"neo4j": {"status": "healthy", "message": "Connected"},
"supabase": {
"status": "healthy",
"message": "Connected",
"url_host": runtime_identity["supabase_url_host"],
},
"redis": {"status": "healthy", "message": "Connected"}
}
}
try:
# Check Neo4j
driver = get_driver()
if not driver:
health_status["services"]["neo4j"] = {
"status": "unhealthy",
"message": "Failed to connect to Neo4j"
}
health_status["status"] = "unhealthy"
except Exception as e:
logger.warning(f"Neo4j health check failed: {e}")
health_status["services"]["neo4j"] = {
"status": "unhealthy",
"message": "Error checking Neo4j"
}
health_status["status"] = "unhealthy"
try:
# Minimal check to confirm Supabase is responsive (e.g., pinging auth or storage endpoint)
supabase_url = os.getenv("SUPABASE_URL")
service_role_key = os.getenv("SERVICE_ROLE_KEY")
response = requests.get(
f"{supabase_url}/auth/v1/health",
headers={"apikey": service_role_key},
timeout=5
)
if response.status_code != 200:
health_status["services"]["supabase"] = {
"status": "unhealthy",
"message": f"Supabase Auth API returned status {response.status_code}"
}
health_status["status"] = "unhealthy"
except Exception as e:
logger.warning(f"Supabase health check failed: {e}")
health_status["services"]["supabase"] = {
"status": "unhealthy",
"message": "Error checking Supabase Auth API"
}
health_status["status"] = "unhealthy"
try:
# Check Redis using new Redis manager
from modules.redis_manager import get_redis_manager
# Determine environment from explicit startup/runtime identity.
environment = runtime_identity["api_runtime_role"]
redis_manager = get_redis_manager(environment)
# Get comprehensive health check
redis_health = redis_manager.health_check()
health_status["services"]["redis"] = {
"status": redis_health['status'],
"message": redis_health.get('error', f"Connected to {environment} environment (db={redis_health['database']})"),
"environment": redis_health['environment'],
"database": redis_health['database'],
"queue_stats": redis_health.get('queue_stats', {})
}
if redis_health['status'] != 'healthy':
health_status["status"] = "unhealthy"
except Exception as e:
logger.warning(f"Redis health check failed: {e}")
health_status["services"]["redis"] = {
"status": "unhealthy",
"message": "Error checking Redis"
}
health_status["status"] = "unhealthy"
if health_status["status"] == "unhealthy":
raise HTTPException(status_code=503, detail=health_status)
return health_status
# Register routes
register_routes(app)
# Start workers in the application process to avoid uvicorn reload issues
@app.on_event("startup")
async def _start_workers_event():
try:
if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true':
logger.info("AUTO_START_QUEUE_WORKERS=false, not starting in-process workers")
return
workers = int(os.getenv('QUEUE_WORKERS', '3'))
services_csv = os.getenv('QUEUE_SERVICES', 'tika,docling,split_map,document_analysis,page_images')
service_names = [s.strip().lower() for s in services_csv.split(',') if s.strip()]
service_enums = []
for name in service_names:
try:
service_enums.append(ServiceType(name))
except Exception:
pass
if not service_enums:
service_enums = list(ServiceType)
processor = get_processor()
started = []
for i in range(workers):
wid = processor.start_worker(worker_id=f"app-worker-{i+1}", services=service_enums)
started.append(wid)
logger.info(f"In-process queue workers started: {started} for services {[s.value for s in service_enums]}")
except Exception as e:
logger.error(f"Failed to start in-process workers: {e}")
@app.on_event("shutdown")
async def _shutdown_workers_event():
try:
processor = get_processor()
processor.shutdown(timeout=30)
except Exception as e:
logger.warning(f"Error during workers shutdown: {e}")
# Global subprocess handles (only for workers now)
workers_process: Optional[subprocess.Popen] = None
# Global Redis manager for cleanup
redis_manager = None
def start_queue_workers():
"""Start queue workers as a subprocess (tied to API lifecycle)."""
global workers_process
if os.getenv('AUTO_START_QUEUE_WORKERS', 'true').lower() != 'true':
logger.info("AUTO_START_QUEUE_WORKERS=false, not starting workers")
return
# If already started, skip
if workers_process is not None and workers_process.poll() is None:
logger.info("Queue workers already running")
return
services = os.getenv(
'QUEUE_SERVICES',
'tika,docling,split_map,document_analysis,page_images'
)
workers = int(os.getenv('QUEUE_WORKERS', '3'))
check_interval = os.getenv('QUEUE_CHECK_INTERVAL', '15')
cmd = [
sys.executable,
'start_queue_workers.py',
'--workers', str(workers),
'--services', services,
'--check-interval', check_interval,
]
# Workers will auto-detect environment and use appropriate Redis database
log_path = os.getenv('QUEUE_WORKERS_LOG', './queue_workers.log')
try:
log_file = open(log_path, 'a')
logger.info(f"Starting queue workers ({workers}) for services [{services}] → {log_path}")
workers_process = subprocess.Popen(
cmd,
stdout=log_file,
stderr=log_file,
preexec_fn=os.setsid if os.name != 'nt' else None
)
except Exception as e:
logger.error(f"Failed to start queue workers: {e}")
def stop_queue_workers():
"""Stop queue workers subprocess."""
global workers_process
if workers_process is not None:
try:
logger.info("Stopping queue workers...")
if os.name != 'nt':
os.killpg(os.getpgid(workers_process.pid), signal.SIGTERM)
else:
workers_process.terminate()
try:
workers_process.wait(timeout=10)
logger.info("Queue workers stopped gracefully")
except subprocess.TimeoutExpired:
logger.warning("Queue workers did not stop gracefully, forcing shutdown...")
if os.name != 'nt':
os.killpg(os.getpgid(workers_process.pid), signal.SIGKILL)
else:
workers_process.kill()
workers_process.wait()
logger.info("Queue workers force stopped")
except Exception as e:
logger.error(f"Error stopping queue workers: {e}")
finally:
workers_process = None
def _install_signal_handlers():
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down...")
stop_queue_workers()
# Gracefully shutdown Redis manager if it exists
global redis_manager
if redis_manager:
redis_manager.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def run_infrastructure_mode():
"""Run infrastructure setup: Neo4j schema, calendar, and Supabase buckets"""
logger.info("Running in infrastructure mode")
logger.info("Starting infrastructure setup...")
try:
from run.initialization import initialize_infrastructure_mode
initialize_infrastructure_mode()
logger.info("Infrastructure setup completed successfully")
return True
except Exception as e:
logger.error(f"Infrastructure setup failed: {str(e)}")
return False
def run_seed_mode(test: bool = False):
"""Run canonical environment seed."""
mode = "test" if test else "full"
logger.info(f"Running canonical seed mode ({mode})")
try:
from run.initialization.seed_environment import seed
import json
result = seed(test=test)
print(json.dumps(result, indent=2, default=str))
return bool(result.get('success'))
except Exception as e:
logger.error(f"Seed mode failed: {str(e)}")
return False
def run_gais_data_mode():
"""Run GAIS data import"""
logger.info("Running in GAIS data import mode")
logger.info("Starting GAIS data import...")
try:
from run.initialization import initialize_gais_data_mode
initialize_gais_data_mode()
logger.info("GAIS data import completed successfully")
return True
except Exception as e:
logger.error(f"GAIS data import failed: {str(e)}")
return False
# Old clear_dev_redis_queue function removed - now handled by Redis Manager
def run_exam_corpus_mode():
"""Seed the public exam-paper corpus from a manifest (optional, gated).
Env controls:
EXAM_CORPUS_MANIFEST - path to the corpus manifest (required to do anything)
EXAM_CORPUS_DRY_RUN - 'true' to validate + report only
EXAM_CORPUS_FORCE - 'true' to re-upload/overwrite existing objects
EXAM_CORPUS_BOARD/_SPEC - filter to one exam_board_code / spec_code
EXAM_CORPUS_USER_SUBSET - 'true' to also seed a user-side test subset
EXAM_CORPUS_FIRST_SWEEP - 'true' to run the docling/auto-map first pass
Skips gracefully (success) when no manifest is configured/present, so it is safe
in a comma-mode list (e.g. INIT_MODE=infra,seed,exam-corpus) before papers exist.
Buckets are NOT created here — infra mode (buckets.py) owns provisioning.
"""
logger.info("Running in exam-corpus seed mode")
manifest = os.getenv("EXAM_CORPUS_MANIFEST")
if not manifest or not os.path.exists(manifest):
logger.warning(
f"exam-corpus: no manifest at EXAM_CORPUS_MANIFEST={manifest!r}; skipping (nothing to seed yet)"
)
return True
try:
from run.initialization.seed_exam_corpus import load
rep = load(
manifest,
dry_run=_truthy_env("EXAM_CORPUS_DRY_RUN"),
force=_truthy_env("EXAM_CORPUS_FORCE"),
board_filter=os.getenv("EXAM_CORPUS_BOARD") or None,
spec_filter=os.getenv("EXAM_CORPUS_SPEC") or None,
user_subset=_truthy_env("EXAM_CORPUS_USER_SUBSET"),
do_first_sweep=_truthy_env("EXAM_CORPUS_FIRST_SWEEP"),
)
if rep.errors:
logger.error(f"exam-corpus seed completed with {len(rep.errors)} error(s)")
return False
logger.info(
f"exam-corpus seed ok: specs={rep.specs_upserted} papers={rep.papers_upserted} "
f"uploaded={rep.files_uploaded}"
)
return True
except Exception as e:
logger.error(f"exam-corpus seed failed: {e}")
return False
def run_development_mode():
"""Run the server in development mode with auto-reload"""
logger.info("Running in development mode")
# Initialize Redis manager for development (auto-clears data)
global redis_manager
from modules.redis_manager import get_redis_manager
redis_manager = get_redis_manager('dev')
if not redis_manager.initialize_environment():
logger.error("Failed to initialize Redis for development")
return False
# Workers are started in app startup event
logger.info("Starting uvicorn server with auto-reload...")
# Install signal handlers for graceful shutdown
_install_signal_handlers()
try:
uvicorn.run(
"main:app",
host="0.0.0.0",
port=int(os.getenv('UVICORN_PORT', 8000)),
log_level=os.getenv('LOG_LEVEL', 'info'),
proxy_headers=True,
timeout_keep_alive=10,
reload=True
)
finally:
stop_queue_workers()
if redis_manager:
redis_manager.shutdown()
def run_production_mode():
"""Run the server in production mode"""
logger.info("Running in production mode")
# Initialize Redis manager for production (preserves data, recovers tasks)
global redis_manager
from modules.redis_manager import get_redis_manager
redis_manager = get_redis_manager('prod')
if not redis_manager.initialize_environment():
logger.error("Failed to initialize Redis for production")
return False
# Workers are started in app startup event
logger.info("Starting uvicorn server in production mode...")
# Install signal handlers for graceful shutdown
_install_signal_handlers()
try:
uvicorn.run(
"main:app",
host="0.0.0.0",
port=int(os.getenv('UVICORN_PORT', 8000)),
log_level=os.getenv('LOG_LEVEL', 'info'),
proxy_headers=True,
timeout_keep_alive=10,
workers=int(os.getenv('UVICORN_WORKERS', '1'))
)
finally:
stop_queue_workers()
if redis_manager:
redis_manager.shutdown()
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="ClassroomCopilot API Server",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Startup modes:
infra - Setup infrastructure (Neo4j schema, calendar, Supabase buckets)
seed - Seed canonical full environment (20 school users)
seed-test - Seed lightweight test environment (9 school users)
gais-data - Import GAIS data (Edubase, etc.)
dev - Run development server with auto-reload
prod - Run production server (for Docker/containerized deployment)
"""
)
parser.add_argument(
'--mode', '-m',
choices=['infra', 'seed', 'seed-test', 'gais-data', 'exam-corpus', 'dev', 'prod'],
default='dev',
help='Startup mode (default: dev)'
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
# Set environment variable for backward compatibility
if args.mode == 'dev':
os.environ['BACKEND_DEV_MODE'] = 'true'
else:
os.environ['BACKEND_DEV_MODE'] = 'false'
logger.info(f"Starting ClassroomCopilot API in {args.mode} mode")
if args.mode == 'infra':
# Run infrastructure setup
success = run_infrastructure_mode()
sys.exit(0 if success else 1)
elif args.mode == 'seed':
success = run_seed_mode(test=False)
sys.exit(0 if success else 1)
elif args.mode == 'seed-test':
success = run_seed_mode(test=True)
sys.exit(0 if success else 1)
elif args.mode == 'gais-data':
# Run GAIS data import
success = run_gais_data_mode()
sys.exit(0 if success else 1)
elif args.mode == 'exam-corpus':
# Seed the public exam-paper corpus from a manifest (gated; skips if none configured)
success = run_exam_corpus_mode()
sys.exit(0 if success else 1)
elif args.mode == 'dev':
# Run development server
run_development_mode()
elif args.mode == 'prod':
# Run production server
run_production_mode()
else:
logger.error(f"Invalid mode: {args.mode}")
sys.exit(1)