""" Enhanced Upload Handler with Memory-Aware Queuing ================================================= Replaces the immediate processing model with intelligent queue management. Provides user feedback about capacity, queue position, and processing status. Features: - Pre-upload capacity checking - Memory-aware queuing with user quotas - Real-time status updates via WebSocket/SSE - Graceful degradation under load - Fair queuing across multiple users """ import os import uuid import time import logging import asyncio from typing import Dict, List, Optional, Any, Tuple from fastapi import HTTPException, BackgroundTasks from dataclasses import asdict from .memory_aware_queue import get_memory_queue, QueuedFile from .redis_manager import get_redis_manager from modules.database.supabase.utils.client import SupabaseServiceRoleClient from modules.database.tools.storage.storage_admin import StorageAdmin logger = logging.getLogger(__name__) class EnhancedUploadHandler: """Enhanced upload handler with memory-aware queuing.""" def __init__(self, environment: str = "dev"): self.memory_queue = get_memory_queue(environment) self.redis_manager = get_redis_manager(environment) self.redis_client = self.redis_manager.client # Processing status tracking self.processing_status_key = "file_processing_status" def check_upload_eligibility(self, user_id: str, file_size: int, mime_type: str) -> Tuple[bool, str, Dict[str, Any]]: """ Check if user can upload a file right now. Returns: (eligible, message, details) """ # Check system capacity can_accept, message, queue_info = self.memory_queue.check_upload_capacity( user_id, file_size, mime_type ) if not can_accept: return False, message, { 'reason': 'capacity_exceeded', 'queue_info': queue_info, 'recommendations': self._get_recommendations(queue_info) } return True, message, { 'status': 'ready_for_upload', 'queue_info': queue_info, 'processing_estimate': self._estimate_processing_time(file_size, mime_type) } async def handle_upload_with_queue(self, file_id: str, user_id: str, filename: str, file_bytes: bytes, mime_type: str, cabinet_id: str, priority: int = 1) -> Dict[str, Any]: """ Handle file upload with intelligent queuing. Steps: 1. Store file immediately (cheap operation) 2. Add to processing queue 3. Return queue status to user 4. Process asynchronously when capacity available """ # Store file immediately (this is fast) storage = StorageAdmin() client = SupabaseServiceRoleClient() # Create database record bucket = f"{cabinet_id}-files" # or your bucket naming convention storage_path = f"{cabinet_id}/{file_id}/{filename}" try: # Store file storage.upload_file(bucket, storage_path, file_bytes, mime_type, upsert=True) # Create file record insert_res = client.supabase.table('files').insert({ 'id': file_id, 'name': filename, 'cabinet_id': cabinet_id, 'bucket': bucket, 'path': storage_path, 'mime_type': mime_type, 'uploaded_by': user_id, 'size_bytes': len(file_bytes), 'source': 'classroomcopilot-web', 'status': 'queued_for_processing' # New status }).execute() if not insert_res.data: raise HTTPException(status_code=500, detail="Failed to create file record") except Exception as e: logger.error(f"Failed to store file {file_id}: {e}") raise HTTPException(status_code=500, detail=f"Storage failed: {str(e)}") # Add to processing queue try: queue_result = self.memory_queue.enqueue_file( file_id=file_id, user_id=user_id, filename=filename, size_bytes=len(file_bytes), mime_type=mime_type, cabinet_id=cabinet_id, priority=priority ) # Update file status in database client.supabase.table('files').update({ 'status': 'queued_for_processing', 'extra': { 'queue_position': queue_result['queue_position'], 'estimated_wait_seconds': queue_result['estimated_wait_seconds'], 'memory_estimate_mb': queue_result['memory_estimate_mb'] } }).eq('id', file_id).execute() logger.info(f"📋 File {file_id} queued at position {queue_result['queue_position']}") return { 'status': 'upload_successful', 'message': 'File uploaded and queued for processing', 'file_id': file_id, 'queue_info': queue_result, 'next_steps': { 'poll_status_endpoint': f'/database/files/{file_id}/processing-status', 'websocket_updates': f'/ws/file-processing/{file_id}' } } except Exception as e: logger.error(f"Failed to queue file {file_id}: {e}") # Clean up stored file try: storage.delete_file(bucket, storage_path) client.supabase.table('files').delete().eq('id', file_id).execute() except: pass raise HTTPException(status_code=500, detail=f"Queue failed: {str(e)}") async def process_queued_files(self, service_name: str = "document_processor"): """ Background service to process queued files. This runs continuously as a background task. """ logger.info(f"🚀 Started queue processor for {service_name}") while True: try: # Get next file from queue queued_file = self.memory_queue.dequeue_next_file(service_name) if not queued_file: # No files ready for processing await asyncio.sleep(5) continue # Update file status await self._update_processing_status(queued_file.file_id, 'processing') # Process the file try: await self._process_file(queued_file, service_name) await self._update_processing_status(queued_file.file_id, 'completed') except Exception as e: logger.error(f"Failed to process file {queued_file.file_id}: {e}") await self._update_processing_status(queued_file.file_id, 'failed', str(e)) finally: # Always free memory self.memory_queue.complete_processing( service_name, queued_file.file_id, queued_file.memory_estimate_mb ) except Exception as e: logger.error(f"Queue processor error: {e}") await asyncio.sleep(10) # Back off on errors async def _process_file(self, queued_file: QueuedFile, service_name: str): """Process a single file from the queue.""" logger.info(f"🔄 Processing file {queued_file.file_id} in {service_name}") # Import here to avoid circular imports from modules.pipeline_controller import get_pipeline_controller client = SupabaseServiceRoleClient() controller = get_pipeline_controller() # Get file record file_result = client.supabase.table('files').select('*').eq('id', queued_file.file_id).single().execute() file_row = file_result.data if not file_row: raise Exception(f"File record not found: {queued_file.file_id}") # Update status to processing client.supabase.table('files').update({ 'status': 'processing' }).eq('id', queued_file.file_id).execute() # Convert to PDF if needed (this is where the bottleneck was before) processing_path = await self._handle_pdf_conversion(file_row) # Enqueue Phase 1 tasks phase1_tasks = controller.enqueue_phase1_tasks( file_id=queued_file.file_id, file_row={**file_row, 'path': processing_path}, processing_path=processing_path, processing_mime=file_row['mime_type'] ) # Update database with task IDs client.supabase.table('files').update({ 'status': 'phase1_processing', 'extra': { **file_row.get('extra', {}), 'phase1_tasks': phase1_tasks, 'processing_started_at': time.time() } }).eq('id', queued_file.file_id).execute() logger.info(f"✅ File {queued_file.file_id} processing initiated") async def _handle_pdf_conversion(self, file_row: Dict[str, Any]) -> str: """Handle PDF conversion asynchronously.""" if file_row['mime_type'] == 'application/pdf': return file_row['path'] # TODO: Implement async PDF conversion # For now, return original path and handle conversion in pipeline logger.info(f"PDF conversion queued for file {file_row['id']}") return file_row['path'] async def _update_processing_status(self, file_id: str, status: str, error: str = None): """Update file processing status.""" status_data = { 'file_id': file_id, 'status': status, 'timestamp': time.time(), 'error': error } # Store in Redis for real-time updates status_key = f"{self.processing_status_key}:{file_id}" self.redis_client.setex(status_key, 86400, json.dumps(status_data)) # 24h expiry # Update database client = SupabaseServiceRoleClient() client.supabase.table('files').update({ 'status': status, 'error_message': error }).eq('id', file_id).execute() logger.info(f"📊 Status update for {file_id}: {status}") def get_processing_status(self, file_id: str) -> Dict[str, Any]: """Get current processing status for a file.""" status_key = f"{self.processing_status_key}:{file_id}" status_json = self.redis_client.get(status_key) if status_json: return json.loads(status_json) # Fallback to database client = SupabaseServiceRoleClient() result = client.supabase.table('files').select('status, error_message, extra').eq('id', file_id).single().execute() if result.data: return { 'file_id': file_id, 'status': result.data['status'], 'error': result.data.get('error_message'), 'extra': result.data.get('extra', {}) } return {'file_id': file_id, 'status': 'not_found'} def _estimate_processing_time(self, file_size: int, mime_type: str) -> Dict[str, Any]: """Estimate processing time for a file.""" # Base time estimates (in seconds) base_times = { 'application/pdf': 60, # 1 minute per MB 'application/msword': 120, # 2 minutes per MB 'image/': 30 # 30 seconds per MB } # Find matching mime type time_per_mb = 60 # default for mime_prefix, time_val in base_times.items(): if mime_type.startswith(mime_prefix): time_per_mb = time_val break file_size_mb = file_size / (1024 * 1024) estimated_seconds = int(file_size_mb * time_per_mb) return { 'estimated_seconds': estimated_seconds, 'estimated_minutes': estimated_seconds / 60, 'phases': { 'pdf_conversion': estimated_seconds * 0.2, 'metadata_extraction': estimated_seconds * 0.3, 'docling_processing': estimated_seconds * 0.5 } } def _get_recommendations(self, queue_info: Dict[str, Any]) -> List[str]: """Get recommendations for user when upload is rejected.""" recommendations = [] if queue_info.get('reason') == 'file_too_large': recommendations.append("Try compressing your file or splitting it into smaller parts") if queue_info.get('utilization', 0) > 0.9: recommendations.append("System is currently overloaded. Try uploading during off-peak hours") recommendations.append("Consider uploading smaller files first") if queue_info.get('user_current', 0) > 0: recommendations.append("Wait for your current uploads to complete before uploading more") if not recommendations: recommendations.append("Please try again in a few minutes") return recommendations # Convenience functions def get_upload_handler(environment: str = "dev") -> EnhancedUploadHandler: """Get enhanced upload handler instance.""" return EnhancedUploadHandler(environment) import json