175 lines
6.8 KiB
Python
175 lines
6.8 KiB
Python
"""
|
|
Queue Monitoring Router
|
|
|
|
Provides endpoints to monitor queue status, service limits, and processing counts
|
|
to help debug issues like Docling server overload.
|
|
"""
|
|
|
|
from fastapi import APIRouter, Depends
|
|
from typing import Dict, Any, List
|
|
from modules.auth.supabase_bearer import SupabaseBearer
|
|
from modules.queue_system import get_queue
|
|
from modules.redis_manager import get_redis_manager
|
|
from modules.logger_tool import initialise_logger
|
|
import os
|
|
|
|
router = APIRouter()
|
|
auth = SupabaseBearer()
|
|
|
|
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
|
|
|
|
@router.get("/queue/status")
|
|
def get_queue_status(payload: Dict[str, Any] = Depends(auth)):
|
|
"""Get comprehensive queue status including service limits and current processing counts."""
|
|
try:
|
|
queue = get_queue()
|
|
stats = queue.get_queue_stats()
|
|
|
|
# Add environment configuration
|
|
config = {
|
|
'QUEUE_DOCLING_LIMIT': int(os.getenv('QUEUE_DOCLING_LIMIT', '2')),
|
|
'QUEUE_TIKA_LIMIT': int(os.getenv('QUEUE_TIKA_LIMIT', '3')),
|
|
'QUEUE_LLM_LIMIT': int(os.getenv('QUEUE_LLM_LIMIT', '5')),
|
|
'BUNDLE_ARCHITECTURE_ENABLED': True,
|
|
'AUTO_DOCLING_OCR': os.getenv('AUTO_DOCLING_OCR', 'true'),
|
|
'AUTO_DOCLING_NO_OCR': os.getenv('AUTO_DOCLING_NO_OCR', 'true'),
|
|
'AUTO_DOCLING_VLM': os.getenv('AUTO_DOCLING_VLM', 'false')
|
|
}
|
|
|
|
return {
|
|
'queue_statistics': stats,
|
|
'configuration': config,
|
|
'redis_host': queue.redis_host,
|
|
'redis_port': queue.redis_port,
|
|
'service_limits': dict(queue.service_limits),
|
|
'rate_limits': dict(queue.rate_limits)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get queue status: {e}")
|
|
return {
|
|
'error': str(e),
|
|
'queue_available': False
|
|
}
|
|
|
|
@router.get("/queue/tasks/by-file/{file_id}")
|
|
def get_file_queue_tasks(file_id: str, payload: Dict[str, Any] = Depends(auth)):
|
|
"""Get all queue tasks for a specific file (for debugging)."""
|
|
try:
|
|
# This would require extending the queue system to track tasks by file_id
|
|
# For now, return a placeholder that indicates this feature needs implementation
|
|
return {
|
|
'message': 'File-specific task tracking not yet implemented',
|
|
'file_id': file_id,
|
|
'suggestion': 'Check Redis directly or extend queue system with file_id indexing'
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get tasks for file {file_id}: {e}")
|
|
return {
|
|
'error': str(e),
|
|
'file_id': file_id
|
|
}
|
|
|
|
@router.get("/queue/health")
|
|
def get_comprehensive_queue_health(payload: Dict[str, Any] = Depends(auth)):
|
|
"""Get comprehensive queue and Redis health information with environment details."""
|
|
try:
|
|
# Determine environment
|
|
environment = 'dev' if os.getenv('BACKEND_DEV_MODE', 'true').lower() == 'true' else 'prod'
|
|
|
|
# Get Redis manager health
|
|
redis_manager = get_redis_manager(environment)
|
|
redis_health = redis_manager.health_check()
|
|
|
|
# Get queue stats
|
|
queue_stats = {}
|
|
queue_available = False
|
|
try:
|
|
queue = get_queue()
|
|
queue_stats = queue.get_queue_stats()
|
|
queue_available = True
|
|
except Exception as e:
|
|
queue_stats = {'error': str(e)}
|
|
|
|
# Environment configuration
|
|
env_config = {
|
|
'current_environment': environment,
|
|
'redis_database': redis_health.get('database', 'unknown'),
|
|
'persistence_enabled': os.getenv(f'REDIS_PERSIST_{environment.upper()}', 'unknown'),
|
|
'task_ttl': os.getenv(f'REDIS_TASK_TTL_{environment.upper()}', 'unknown'),
|
|
'service_limits': {
|
|
'docling': int(os.getenv('QUEUE_DOCLING_LIMIT', '2')),
|
|
'tika': int(os.getenv('QUEUE_TIKA_LIMIT', '3')),
|
|
'llm': int(os.getenv('QUEUE_LLM_LIMIT', '5')),
|
|
'split_map': int(os.getenv('QUEUE_SPLIT_MAP_LIMIT', '10')),
|
|
'document_analysis': int(os.getenv('QUEUE_DOCUMENT_ANALYSIS_LIMIT', '5')),
|
|
'page_images': int(os.getenv('QUEUE_PAGE_IMAGES_LIMIT', '3'))
|
|
},
|
|
'workers': int(os.getenv('QUEUE_WORKERS', '1'))
|
|
}
|
|
|
|
# Overall health status
|
|
overall_status = 'healthy'
|
|
issues = []
|
|
|
|
if redis_health['status'] != 'healthy':
|
|
overall_status = 'unhealthy'
|
|
issues.append(f"Redis: {redis_health.get('error', 'Unknown issue')}")
|
|
|
|
if not queue_available:
|
|
overall_status = 'degraded'
|
|
issues.append(f"Queue system: {queue_stats.get('error', 'Not accessible')}")
|
|
|
|
# Check for concerning queue states
|
|
if queue_available and queue_stats.get('dead_letter_count', 0) > 0:
|
|
overall_status = 'warning'
|
|
issues.append(f"Dead letter queue has {queue_stats['dead_letter_count']} failed tasks")
|
|
|
|
return {
|
|
'status': overall_status,
|
|
'issues': issues,
|
|
'timestamp': redis_health.get('timestamp'),
|
|
'environment': env_config,
|
|
'redis': redis_health,
|
|
'queue': queue_stats,
|
|
'recommendations': _get_health_recommendations(redis_health, queue_stats, environment)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get comprehensive queue health: {e}")
|
|
return {
|
|
'status': 'error',
|
|
'error': str(e),
|
|
'timestamp': None
|
|
}
|
|
|
|
def _get_health_recommendations(redis_health: Dict, queue_stats: Dict, environment: str) -> List[str]:
|
|
"""Generate health recommendations based on current state."""
|
|
recommendations = []
|
|
|
|
# Redis recommendations
|
|
if redis_health.get('status') != 'healthy':
|
|
recommendations.append("Check Redis service status and connectivity")
|
|
|
|
# Queue recommendations
|
|
if queue_stats.get('dead_letter_count', 0) > 5:
|
|
recommendations.append("High number of failed tasks - check task processing logic")
|
|
|
|
# Processing recommendations
|
|
processing_counts = queue_stats.get('processing', {})
|
|
for service, count in processing_counts.items():
|
|
if count < 0:
|
|
recommendations.append(f"Negative processing counter for {service} - restart server to reset")
|
|
|
|
# Environment-specific recommendations
|
|
if environment == 'dev':
|
|
recommendations.append("Development mode: Database will be cleared on restart")
|
|
else:
|
|
recommendations.append("Production mode: Tasks will be recovered on restart")
|
|
|
|
if not recommendations:
|
|
recommendations.append("System appears healthy - no specific recommendations")
|
|
|
|
return recommendations
|