api/archive/auto_processing/pipeline_controller.py
2025-11-14 14:47:19 +00:00

1317 lines
62 KiB
Python

"""
Pipeline Controller for Three-Phase Document Processing Architecture
This module coordinates the three phases of document processing:
- Phase 1: Document Structure Discovery & Analysis
- Phase 2: Parallel Content Processing Pipelines
- Phase 3: Enhanced Frontend Viewing (handled by frontend)
Features:
- Environment variable controlled auto-processing
- Phase 1 completion detection
- Automatic Phase 2 triggering
- Intelligent retry and coordination logic
"""
import json
import os
import uuid
import time
from typing import Dict, Any, List, Optional, Set
from pathlib import Path
from modules.logger_tool import initialise_logger
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin
from modules.queue_system import (
enqueue_tika_task, enqueue_docling_task, enqueue_split_map_task,
enqueue_document_analysis_task, enqueue_page_images_task,
TaskPriority, get_queue
)
from modules.bundle_metadata import (
create_standard_metadata, BundleMetadata, PipelineType, ProcessingMode, BundleType
)
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
class DocumentPipelineController:
"""
Coordinates the three-phase document processing pipeline.
"""
def __init__(self):
self.client = SupabaseServiceRoleClient()
self.storage = StorageAdmin()
# Phase 1 environment variables
self.auto_tika = os.getenv('AUTO_TIKA_PROCESSING', 'true').lower() == 'true'
self.auto_page_images = os.getenv('AUTO_PAGE_IMAGES', 'true').lower() == 'true'
self.auto_document_analysis = os.getenv('AUTO_DOCUMENT_ANALYSIS', 'true').lower() == 'true'
self.auto_split_map = os.getenv('AUTO_SPLIT_MAP_GENERATION', 'true').lower() == 'true'
# Phase 2 environment variables
self.auto_docling_ocr = os.getenv('AUTO_DOCLING_OCR', 'true').lower() == 'true'
self.auto_docling_no_ocr = os.getenv('AUTO_DOCLING_NO_OCR', 'true').lower() == 'true'
self.auto_docling_vlm = os.getenv('AUTO_DOCLING_VLM', 'false').lower() == 'true'
# Processing granularity
self.docling_ocr_by_page = os.getenv('DOCLING_OCR_BY_PAGE', 'false').lower() == 'true'
self.docling_no_ocr_by_page = os.getenv('DOCLING_NO_OCR_BY_PAGE', 'false').lower() == 'true'
self.docling_vlm_by_page = os.getenv('DOCLING_VLM_BY_PAGE', 'true').lower() == 'true'
# Grouping strategy
self.docling_use_split_map = os.getenv('DOCLING_USE_SPLIT_MAP', 'true').lower() == 'true'
self.docling_split_threshold = int(os.getenv('DOCLING_SPLIT_THRESHOLD', '50'))
logger.info("Pipeline controller initialized with new bundle architecture")
def enqueue_phase1_tasks(self, file_id: str, file_row: Dict[str, Any],
processing_path: str, processing_mime: str,
priority: TaskPriority = TaskPriority.HIGH) -> Dict[str, List[str]]:
"""
Enqueue Phase 1 tasks: Structure Discovery & Analysis
Returns:
Dictionary mapping task types to task IDs
"""
logger.info(f"Phase 1: Starting structure discovery for file {file_id}")
task_ids = {}
bucket = file_row['bucket']
cabinet_id = file_row['cabinet_id']
# Step 1: Tika Processing (metadata extraction)
if self.auto_tika:
tika_url = os.getenv('TIKA_URL')
if tika_url:
tika_task_id = enqueue_tika_task(
file_id=file_id,
payload={
'bucket': bucket,
'file_path': processing_path,
'cabinet_id': cabinet_id,
'mime_type': processing_mime
},
priority=priority
)
task_ids['tika'] = [tika_task_id]
logger.info(f"Phase 1: Enqueued Tika task {tika_task_id}")
else:
logger.warning("Phase 1: Tika enabled but TIKA_URL not configured")
# Step 2: Frontmatter processing (lightweight document overview)
docling_url = os.getenv('DOCLING_URL') or os.getenv('NEOFS_DOCLING_URL')
if docling_url:
try:
front_pages = int(os.getenv('DOCLING_FRONTPAGES', '3'))
except Exception:
front_pages = 3
# Create enhanced metadata for frontmatter JSON display in UI
frontmatter_metadata = {
'display_name': f'Document Frontmatter (p1-{front_pages})',
'bundle_label': 'Frontmatter Analysis',
'section_title': 'Document Frontmatter',
'page_range': [1, front_pages],
'page_count': front_pages,
'bundle_type': 'frontmatter_json',
'processing_mode': 'frontmatter_analysis',
'pipeline': 'frontmatter_ocr',
'is_frontmatter': True,
'ui_category': 'document_analysis',
'ui_order': 1, # Show first in UI
'description': f'OCR analysis of first {front_pages} pages for document structure and metadata',
'viewer_type': 'json'
}
frontmatter_task_id = enqueue_docling_task(
file_id=file_id,
task_type='docling_frontmatter_json',
payload={
'bucket': bucket,
'file_path': processing_path,
'cabinet_id': cabinet_id,
'mime_type': processing_mime,
'config': {
'do_ocr': True,
'force_ocr': False,
'image_export_mode': 'embedded',
'ocr_engine': 'easyocr',
'ocr_lang': 'en',
'pdf_backend': 'dlparse_v4',
'table_mode': 'fast',
'target_type': 'inbody',
'to_formats': 'json',
'page_range': [1, front_pages]
},
'artefact_extra': frontmatter_metadata,
'depends_on': task_ids.get('tika', [])
},
priority=priority,
timeout=int(os.getenv('DOCLING_FRONTMATTER_TIMEOUT', '1800'))
)
task_ids['frontmatter'] = [frontmatter_task_id]
logger.info(f"Phase 1: Enqueued frontmatter task {frontmatter_task_id}")
# Step 3: Document Structure Analysis (LLM-enhanced hierarchy)
if self.auto_document_analysis:
analysis_task_id = enqueue_docling_task(
file_id=file_id,
task_type='document_structure_analysis',
payload={
'bucket': bucket,
'file_path': processing_path,
'cabinet_id': cabinet_id,
'mime_type': processing_mime,
'config': {
'target_type': 'inbody',
'to_formats': 'json',
'do_ocr': False,
'force_ocr': False
},
'depends_on': task_ids.get('frontmatter', [])
},
priority=priority,
timeout=int(os.getenv('DOCUMENT_ANALYSIS_TIMEOUT', '300'))
)
task_ids['document_analysis'] = [analysis_task_id]
logger.info(f"Phase 1: Enqueued document analysis task {analysis_task_id}")
# Step 4: Split Map Generation (definitive section boundaries)
if self.auto_split_map:
split_map_task_id = enqueue_split_map_task(
file_id=file_id,
payload={
'depends_on': task_ids.get('frontmatter', []) + task_ids.get('document_analysis', [])
},
priority=TaskPriority.NORMAL
)
task_ids['split_map'] = [split_map_task_id]
logger.info(f"Phase 1: Enqueued split map task {split_map_task_id}")
# Step 5: Page Images Generation (for frontend viewing)
if self.auto_page_images:
page_images_task_id = enqueue_docling_task(
file_id=file_id,
task_type='generate_page_images',
payload={
'bucket': bucket,
'file_path': processing_path,
'cabinet_id': cabinet_id,
'mime_type': processing_mime,
'config': {},
'depends_on': task_ids.get('document_analysis', [])
},
priority=TaskPriority.NORMAL,
timeout=int(os.getenv('PAGE_IMAGES_TIMEOUT', '1800'))
)
task_ids['page_images'] = [page_images_task_id]
logger.info(f"Phase 1: Enqueued page images task {page_images_task_id}")
# Bundle tasks are now directly enqueued by split_map task completion
total_tasks = sum(len(task_list) for task_list in task_ids.values())
logger.info(f"Phase 1: Enqueued {total_tasks} tasks for file {file_id}: {list(task_ids.keys())}")
return task_ids
def check_phase1_completion(self, file_id: str) -> Dict[str, Any]:
"""
Check if Phase 1 is complete for a given file.
Returns:
Dictionary with completion status and details
"""
logger.debug(f"Checking Phase 1 completion for file {file_id}")
# Get all artefacts for the file
artefacts_result = self.client.supabase.table('document_artefacts').select('*').eq('file_id', file_id).execute()
artefacts = artefacts_result.data or []
# Check for required Phase 1 artefacts
phase1_checks = {
'tika_metadata': False,
'frontmatter': False,
'document_analysis': False,
'split_map': False,
'page_images': False
}
for artefact in artefacts:
if artefact['status'] == 'completed':
artefact_type = artefact['type']
if artefact_type == 'tika_json':
phase1_checks['tika_metadata'] = True
elif artefact_type == 'docling_frontmatter_json':
phase1_checks['frontmatter'] = True
elif artefact_type == 'document_outline_hierarchy':
phase1_checks['document_analysis'] = True
elif artefact_type == 'split_map_json':
phase1_checks['split_map'] = True
elif artefact_type == 'page_images':
phase1_checks['page_images'] = True
# Determine completion based on enabled features
required_checks = []
if self.auto_tika:
required_checks.append('tika_metadata')
required_checks.append('frontmatter') # Always required for basic processing
if self.auto_document_analysis:
required_checks.append('document_analysis')
if self.auto_split_map:
required_checks.append('split_map')
if self.auto_page_images:
required_checks.append('page_images')
completed_required = [check for check in required_checks if phase1_checks[check]]
is_complete = len(completed_required) == len(required_checks)
return {
'file_id': file_id,
'is_complete': is_complete,
'completed_components': completed_required,
'required_components': required_checks,
'all_checks': phase1_checks,
'completion_percentage': (len(completed_required) / max(len(required_checks), 1)) * 100
}
def enqueue_sequential_docling_pipelines(self, file_id: str, file_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Enqueue sequential docling pipelines with dependencies: no_ocr → ocr → vlm
Each pipeline depends on ALL tasks from the previous pipeline completing.
This replaces the complex Phase 2 coordinator with simple task dependencies.
Args:
file_id: The file ID to process
file_data: File processing information (bucket, path, etc.)
Returns:
Dictionary with enqueued pipeline information
"""
logger.info(f"Enqueueing sequential docling pipelines for file {file_id}")
bucket = file_data['bucket']
file_path = file_data['file_path']
cabinet_id = file_data['cabinet_id']
mime_type = file_data['mime_type']
# Base configuration shared by all pipelines (pipeline-specific options added per pipeline)
base_config = {
'to_formats': ['json', 'html', 'text', 'md', 'doctags'],
'image_export_mode': 'referenced',
'target_type': 'zip',
'pdf_backend': os.getenv('DOCLING_PDF_BACKEND', 'dlparse_v4'),
'include_images': os.getenv('DOCLING_INCLUDE_IMAGES', 'true').lower() == 'true',
'images_scale': float(os.getenv('DOCLING_IMAGES_SCALE', '2.0')),
'ocr_engine': os.getenv('OCR_ENGINE', 'easyocr'),
'ocr_lang': os.getenv('OCR_LANG', 'en'),
'picture_description_area_threshold': float(os.getenv('DOCLING_PICTURE_DESCRIPTION_AREA_THRESHOLD', '0.05'))
}
# Determine the pipeline execution order: no_ocr → ocr → vlm
pipeline_order = []
if self.auto_docling_no_ocr:
pipeline_order.append('no_ocr')
if self.auto_docling_ocr:
pipeline_order.append('ocr')
if self.auto_docling_vlm:
pipeline_order.append('vlm')
if not pipeline_order:
logger.info(f"No docling pipelines enabled for file {file_id}")
return {
'file_id': file_id,
'enqueued_pipelines': {},
'total_tasks': 0,
'sequential_order': [],
'message': 'No docling pipelines enabled'
}
logger.info(f"Sequential pipeline order for file {file_id}: {pipeline_order}")
# Enqueue all pipelines with proper dependencies
enqueued_pipelines = {}
all_task_ids = {}
for i, pipeline_type in enumerate(pipeline_order):
# Determine dependencies: depend on ALL tasks from previous pipeline
depends_on = []
if i > 0:
previous_pipeline = pipeline_order[i - 1]
depends_on = all_task_ids.get(previous_pipeline, [])
logger.info(f"Pipeline {pipeline_type} will depend on {len(depends_on)} tasks from {previous_pipeline}: {depends_on[:3]}..." if len(depends_on) > 3 else f"Pipeline {pipeline_type} will depend on {len(depends_on)} tasks from {previous_pipeline}: {depends_on}")
else:
logger.info(f"Pipeline {pipeline_type} has no dependencies (first pipeline)")
# Create pipeline tasks
pipeline_result = self._enqueue_single_pipeline_with_deps(
file_id, pipeline_type, base_config, bucket, file_path, cabinet_id, mime_type, depends_on
)
if pipeline_result:
enqueued_pipelines[pipeline_type] = pipeline_result
all_task_ids[pipeline_type] = pipeline_result['task_ids']
logger.info(f"Enqueued {pipeline_type} pipeline with {len(pipeline_result['task_ids'])} tasks")
total_tasks = sum(len(p.get('task_ids', [])) for p in enqueued_pipelines.values())
logger.info(f"Successfully enqueued {len(pipeline_order)} sequential pipelines with {total_tasks} total tasks for file {file_id}")
return {
'file_id': file_id,
'enqueued_pipelines': enqueued_pipelines,
'total_tasks': total_tasks,
'sequential_order': pipeline_order
}
def _determine_processing_mode(self, file_id: str, pipeline_type: str) -> tuple[str, dict]:
"""
Determine how to process document based on settings and characteristics.
Implements corrected decision logic:
1. Priority 1: Respect explicit BY_PAGE preference
2. Priority 2: Check size threshold for auto-processing
3. Priority 3: Use split map for large documents
4. Priority 4: Fallback chunking
Returns:
Tuple of (processing_mode, processing_data)
"""
# Check BY_PAGE flags first (highest priority)
by_page = self._get_by_page_setting(pipeline_type)
if by_page:
logger.info(f"BY_PAGE enabled for {pipeline_type} - creating page-based bundles regardless of document size")
return "split_by_pages", self._get_page_ranges(file_id)
# Get document characteristics
page_count = self._get_page_count(file_id)
# Apply size threshold logic
if page_count < self.docling_split_threshold:
logger.info(f"Document has {page_count} pages (< {self.docling_split_threshold} threshold) - creating single bundle")
return "whole_document", {}
# Check for split map availability
split_map = self._load_split_map_if_needed(file_id)
if split_map and self.docling_use_split_map:
logger.info(f"Document has {page_count} pages (>= {self.docling_split_threshold} threshold) with split map - creating section-based bundles")
return "split_by_sections", split_map
else:
logger.error(f"Document has {page_count} pages (>= {self.docling_split_threshold} threshold) without split map - ERROR")
return "error"
def _get_by_page_setting(self, pipeline_type: str) -> bool:
"""Get BY_PAGE setting for the specified pipeline type."""
if pipeline_type == 'no_ocr':
return self.docling_no_ocr_by_page
elif pipeline_type == 'ocr':
return self.docling_ocr_by_page
elif pipeline_type == 'vlm':
return self.docling_vlm_by_page
return False
def _get_pipeline_specific_config(self, pipeline_type: str) -> Dict[str, Any]:
"""Get pipeline-specific configuration options from environment variables."""
if pipeline_type == 'no_ocr':
return {
'table_mode': os.getenv('DOCLING_NO_OCR_TABLE_MODE', 'fast'),
'table_cell_matching': os.getenv('DOCLING_NO_OCR_TABLE_CELL_MATCHING', 'false').lower() == 'true',
'do_formula_enrichment': os.getenv('DOCLING_NO_OCR_DO_FORMULA_ENRICHMENT', 'false').lower() == 'true',
'do_code_enrichment': os.getenv('DOCLING_NO_OCR_DO_CODE_ENRICHMENT', 'false').lower() == 'true',
'do_table_structure': os.getenv('DOCLING_NO_OCR_DO_TABLE_STRUCTURE', 'true').lower() == 'true',
'do_picture_classification': os.getenv('DOCLING_NO_OCR_DO_PICTURE_CLASSIFICATION', 'false').lower() == 'true',
'do_picture_description': os.getenv('DOCLING_NO_OCR_DO_PICTURE_DESCRIPTION', 'false').lower() == 'true'
}
elif pipeline_type == 'ocr':
return {
'table_mode': os.getenv('DOCLING_OCR_TABLE_MODE', 'accurate'),
'table_cell_matching': os.getenv('DOCLING_OCR_TABLE_CELL_MATCHING', 'true').lower() == 'true',
'do_formula_enrichment': os.getenv('DOCLING_OCR_DO_FORMULA_ENRICHMENT', 'true').lower() == 'true',
'do_code_enrichment': os.getenv('DOCLING_OCR_DO_CODE_ENRICHMENT', 'true').lower() == 'true',
'do_table_structure': os.getenv('DOCLING_OCR_DO_TABLE_STRUCTURE', 'true').lower() == 'true',
'do_picture_classification': os.getenv('DOCLING_OCR_DO_PICTURE_CLASSIFICATION', 'false').lower() == 'true',
'do_picture_description': os.getenv('DOCLING_OCR_DO_PICTURE_DESCRIPTION', 'false').lower() == 'true'
}
elif pipeline_type == 'vlm':
return {
'table_mode': os.getenv('DOCLING_VLM_TABLE_MODE', 'accurate'),
'table_cell_matching': os.getenv('DOCLING_VLM_TABLE_CELL_MATCHING', 'true').lower() == 'true',
'do_formula_enrichment': os.getenv('DOCLING_VLM_DO_FORMULA_ENRICHMENT', 'false').lower() == 'true',
'do_code_enrichment': os.getenv('DOCLING_VLM_DO_CODE_ENRICHMENT', 'false').lower() == 'true',
'do_table_structure': os.getenv('DOCLING_VLM_DO_TABLE_STRUCTURE', 'true').lower() == 'true',
'do_picture_classification': os.getenv('DOCLING_VLM_DO_PICTURE_CLASSIFICATION', 'true').lower() == 'true',
'do_picture_description': os.getenv('DOCLING_VLM_DO_PICTURE_DESCRIPTION', 'true').lower() == 'true'
}
else:
# Default config for unknown pipeline types
return {
'table_mode': 'fast',
'table_cell_matching': False,
'do_formula_enrichment': False,
'do_code_enrichment': False,
'do_table_structure': True,
'do_picture_classification': False,
'do_picture_description': False
}
def _get_page_count(self, file_id: str) -> int:
"""Get page count for the file from existing artefacts (first Tika)."""
logger.info(f"🔍 PAGE COUNT: Starting page count detection for file {file_id}")
try:
# Try to get page count from existing artefacts, excluding frontmatter (partial document)
artefacts = self.client.supabase.table('document_artefacts').select('type,extra').eq('file_id', file_id).execute()
artefact_types = [art.get('type', 'unknown') for art in artefacts.data or []]
logger.info(f"🔍 PAGE COUNT: Found {len(artefacts.data or [])} artefacts for file {file_id}: {artefact_types}")
for art in artefacts.data or []:
art_type = art.get('type', 'unknown')
extra = art.get('extra', {})
logger.info(f"🔍 PAGE COUNT: Checking artefact type '{art_type}' for file {file_id}")
# Skip frontmatter artefacts as they only contain partial page counts
if art_type == 'docling_frontmatter_json':
logger.info(f"🔍 PAGE COUNT: Skipping frontmatter artefact (partial page count) for file {file_id}")
continue
# Also skip docling_json artefacts that are from frontmatter processing
if art_type == 'docling_json' and extra.get('is_frontmatter', False):
logger.info(f"🔍 PAGE COUNT: Skipping frontmatter-derived docling_json artefact (partial page count) for file {file_id}")
continue
# Also skip docling_json artefacts that have frontmatter-related pipeline info
if art_type == 'docling_json' and extra.get('pipeline') == 'frontmatter_ocr':
logger.info(f"🔍 PAGE COUNT: Skipping frontmatter pipeline docling_json artefact (partial page count) for file {file_id}")
continue
if 'page_count' in extra:
page_count = int(extra['page_count'])
logger.info(f"✅ PAGE COUNT: Found page count {page_count} from {art_type} artefact for file {file_id}")
return page_count
else:
logger.info(f"🔍 PAGE COUNT: No page_count in {art_type} artefact for file {file_id}")
logger.info(f"🔍 PAGE COUNT: No artefacts with page_count found, trying Tika JSON parsing for file {file_id}")
# Try to get page count from Tika JSON (most reliable source)
tika_arts = self.client.supabase.table('document_artefacts') \
.select('rel_path') \
.eq('file_id', file_id) \
.eq('type', 'tika_json') \
.execute()
if tika_arts.data:
logger.info(f"🔍 PAGE COUNT: Found Tika JSON artefact, parsing content for file {file_id}")
file_info = self.client.supabase.table('files').select('bucket').eq('id', file_id).single().execute()
if file_info.data:
tika_data = self.storage.download_file(file_info.data['bucket'], tika_arts.data[0]['rel_path'])
import json
tika_json = json.loads(tika_data.decode('utf-8'))
# Check common Tika page count keys in top level and metadata
logger.info(f"🔍 PAGE COUNT: Checking Tika JSON keys for page count in file {file_id}")
# First check metadata section (most common location)
metadata = tika_json.get('metadata', {})
for key in ("xmpTPg:NPages", "Page-Count", "pdf:PageCount", "pdf:pagecount", "meta:page-count", "pdfa:PDFVersion"):
# Check both exact key and lowercase version in metadata
value = metadata.get(key) or metadata.get(key.lower())
if value is not None:
try:
page_count = int(value)
if page_count > 0:
logger.info(f"✅ PAGE COUNT: Found page count {page_count} from Tika metadata key '{key}' for file {file_id}")
return page_count
except Exception as parse_error:
logger.info(f"🔍 PAGE COUNT: Could not parse value '{value}' from Tika metadata key '{key}': {parse_error}")
continue
# Also check top level (fallback)
for key in ("xmpTPg:NPages", "Page-Count", "pdf:PageCount", "pdf:pagecount"):
value = tika_json.get(key) or tika_json.get(key.lower())
if value is not None:
try:
page_count = int(value)
if page_count > 0:
logger.info(f"✅ PAGE COUNT: Found page count {page_count} from Tika JSON top-level key '{key}' for file {file_id}")
return page_count
except Exception as parse_error:
logger.info(f"🔍 PAGE COUNT: Could not parse value '{value}' from Tika top-level key '{key}': {parse_error}")
continue
# Debug: Show available keys to help diagnose issues
logger.info(f"🔍 PAGE COUNT: Available Tika JSON top-level keys: {list(tika_json.keys())}")
if 'metadata' in tika_json:
logger.info(f"🔍 PAGE COUNT: Available Tika metadata keys: {list(metadata.keys())}")
logger.warning(f"🔍 PAGE COUNT: No valid page count keys found in Tika JSON for file {file_id}")
else:
logger.warning(f"🔍 PAGE COUNT: Could not get file info for Tika JSON parsing for file {file_id}")
else:
logger.warning(f"🔍 PAGE COUNT: No Tika JSON artefact found for file {file_id}")
# Final fallback - try to get it directly from PDF using PyMuPDF
logger.warning(f"🔍 PAGE COUNT: Trying direct PDF parsing as final fallback for file {file_id}")
return self._get_page_count_direct_pdf(file_id)
except Exception as e:
logger.error(f"❌ PAGE COUNT: Error getting page count for file {file_id}: {e}, defaulting to {self.docling_split_threshold + 1}")
return self.docling_split_threshold + 1
def _get_page_count_direct_pdf(self, file_id: str) -> int:
"""Final fallback: Get page count directly from PDF using PyMuPDF."""
try:
# Get file info from database
file_info = self.client.supabase.table('files').select('bucket,path,cabinet_id').eq('id', file_id).single().execute()
if not file_info.data:
logger.warning(f"🔍 PAGE COUNT: Could not find file info for {file_id}, defaulting to threshold + 1")
return self.docling_split_threshold + 1
file_row = file_info.data
bucket = file_row['bucket']
file_path = file_row['path']
# Download and read PDF directly with PyMuPDF
logger.info(f"🔍 PAGE COUNT: Reading PDF directly from storage for file {file_id}")
pdf_bytes = self.storage.download_file(bucket, file_path)
import fitz # PyMuPDF
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
page_count = len(doc)
doc.close()
logger.info(f"✅ PAGE COUNT: Direct PDF reading found {page_count} pages for file {file_id}")
return page_count
except Exception as e:
logger.error(f"❌ PAGE COUNT: Direct PDF reading failed for file {file_id}: {e}, defaulting to {self.docling_split_threshold + 1}")
return self.docling_split_threshold + 1
def _get_page_ranges(self, file_id: str) -> dict:
"""Get page ranges for page-based processing."""
page_count = self._get_page_count(file_id)
return {
'pages': list(range(1, page_count + 1)),
'total_pages': page_count
}
def _load_split_map_if_needed(self, file_id: str) -> Optional[Dict[str, Any]]:
"""Load split map if needed for processing decisions."""
try:
file_info = self.client.supabase.table('files').select('bucket').eq('id', file_id).single().execute()
if not file_info.data:
return None
return self._load_split_map(file_info.data['bucket'], file_id)
except Exception:
return None
def _create_chunked_ranges(self, page_count: int) -> dict:
"""Create chunked page ranges for large documents without split maps."""
chunk_size = max(10, self.docling_split_threshold // 4) # 1/4 of threshold, min 10 pages
chunks = []
for start_page in range(1, page_count + 1, chunk_size):
end_page = min(start_page + chunk_size - 1, page_count)
chunks.append({
'start': start_page,
'end': end_page,
'title': f'Pages {start_page}-{end_page}'
})
return {
'entries': chunks,
'total_chunks': len(chunks)
}
def _enqueue_single_pipeline_with_deps(self, file_id: str, pipeline_type: str, base_config: Dict[str, Any],
bucket: str, file_path: str, cabinet_id: str, mime_type: str,
depends_on: List[str]) -> Optional[Dict[str, Any]]:
"""Enqueue a single pipeline with dependencies on previous pipeline tasks."""
group_id = str(uuid.uuid4())
# Get pipeline-specific configuration options
pipeline_specific_config = self._get_pipeline_specific_config(pipeline_type)
if pipeline_type == 'no_ocr':
config = {
**base_config,
**pipeline_specific_config,
'do_ocr': False,
'force_ocr': False,
'pipeline': 'standard'
}
logger.info(f"NO_OCR pipeline config: table_mode={config['table_mode']}, "
f"formula_enrichment={config['do_formula_enrichment']}, "
f"code_enrichment={config['do_code_enrichment']}")
elif pipeline_type == 'ocr':
config = {
**base_config,
**pipeline_specific_config,
'do_ocr': True,
'force_ocr': False,
'pipeline': 'standard'
}
logger.info(f"OCR pipeline config: table_mode={config['table_mode']}, "
f"formula_enrichment={config['do_formula_enrichment']}, "
f"code_enrichment={config['do_code_enrichment']}")
elif pipeline_type == 'vlm':
config = {
**base_config,
**pipeline_specific_config,
'do_ocr': False,
'force_ocr': False,
'pipeline': 'vlm',
'vlm_pipeline_model': os.getenv('DOCLING_VLM_MODEL', 'smoldocling')
}
logger.info(f"VLM pipeline config: table_mode={config['table_mode']}, "
f"picture_classification={config['do_picture_classification']}, "
f"picture_description={config['do_picture_description']}")
else:
logger.error(f"Unknown pipeline type: {pipeline_type}")
return None
# Determine processing mode using corrected logic
processing_mode, processing_data = self._determine_processing_mode(file_id, pipeline_type)
# Enqueue single bundle task with dependencies
task_id = self._enqueue_bundle_task_with_deps(
file_id, pipeline_type, group_id, config, processing_mode, processing_data,
bucket, file_path, cabinet_id, mime_type, depends_on
)
return {
'group_id': group_id,
'task_ids': [task_id] if task_id else [],
'task_count': 1 if task_id else 0,
'processing_mode': processing_mode,
'processing_data': processing_data
}
def _enqueue_bundle_task_with_deps(self, file_id: str, pipeline_type: str, group_id: str,
config: Dict[str, Any], processing_mode: str, processing_data: dict,
bucket: str, file_path: str, cabinet_id: str, mime_type: str,
depends_on: List[str]) -> Optional[str]:
"""
Enqueue a single bundle task that handles processing internally based on mode.
This replaces the old approach of creating multiple individual tasks.
"""
from modules.queue_system import enqueue_docling_task, TaskPriority
from modules.bundle_metadata import create_standard_metadata
# Map processing modes to bundle types and task types
if processing_mode == "whole_document":
task_type = 'docling_bundle'
bundle_type = 'whole_document'
else:
task_type = 'docling_bundle_split'
bundle_type = processing_mode
# Create bundle metadata with correct processing mode mapping
if processing_mode == "whole_document":
bundle_processing_mode = "whole_document"
elif processing_mode.startswith("split_by_"):
# For split modes, map to the appropriate bundle metadata mode
if processing_mode == "split_by_pages":
bundle_processing_mode = "pages"
elif processing_mode == "split_by_sections":
bundle_processing_mode = "sections"
elif processing_mode == "split_by_chunks":
bundle_processing_mode = "chunks"
else:
bundle_processing_mode = processing_mode.replace('split_by_', '')
else:
bundle_processing_mode = processing_mode
bundle_metadata = create_standard_metadata(
file_id=file_id,
pipeline="vlm" if pipeline_type == "vlm" else ("ocr" if config.get('do_ocr') else "no_ocr"),
processing_mode=bundle_processing_mode,
config=config,
group_id=group_id,
producer="auto_phase2"
)
# Create task payload with new bundle architecture
payload = {
'bucket': bucket,
'file_path': file_path,
'cabinet_id': cabinet_id,
'mime_type': mime_type,
'config': config,
'processing_mode': processing_mode,
'processing_data': processing_data,
'bundle_metadata': bundle_metadata.to_artefact_extra(),
'depends_on': depends_on
}
# Determine timeout based on processing complexity
if processing_mode == "whole_document":
timeout = 7200 # 2 hours for whole document
elif processing_mode == "split_by_pages":
# Estimate based on page count
page_count = processing_data.get('total_pages', 50)
timeout = min(14400, max(3600, page_count * 60)) # 1-4 hours based on pages
else:
# Section or chunk based processing
section_count = len(processing_data.get('entries', []))
timeout = min(10800, max(3600, section_count * 300)) # 1-3 hours based on sections
logger.info(f"Enqueuing {task_type} task for {pipeline_type} pipeline: {processing_mode} (timeout: {timeout}s)")
try:
task_id = enqueue_docling_task(
file_id=file_id,
task_type=task_type,
payload=payload,
priority=TaskPriority.NORMAL,
timeout=timeout
)
logger.info(f"Successfully enqueued {task_type} task {task_id} for {pipeline_type} pipeline")
return task_id
except Exception as e:
logger.error(f"Failed to enqueue bundle task for {pipeline_type} pipeline: {e}")
return None
def trigger_phase2_pipelines(self, file_id: str, file_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Trigger Phase 2 sequential processing pipelines based on environment configuration.
Pipelines run in order: no_ocr → ocr → vlm (depending on what's enabled).
Only the first pipeline starts immediately; others are triggered when the previous completes.
Args:
file_id: The file ID to process
file_data: File processing information (bucket, path, etc.)
Returns:
Dictionary with triggered pipeline information
"""
logger.info(f"Phase 2: Starting sequential content processing for file {file_id}")
triggered_pipelines = {}
bucket = file_data['bucket']
file_path = file_data['file_path']
cabinet_id = file_data['cabinet_id']
mime_type = file_data['mime_type']
# Base configuration for all pipelines (DEPRECATED METHOD - use enqueue_sequential_docling_pipelines)
base_config = {
'to_formats': ['json', 'html', 'text', 'md', 'doctags'],
'image_export_mode': 'referenced',
'target_type': 'zip',
'pdf_backend': os.getenv('DOCLING_PDF_BACKEND', 'dlparse_v4'),
'include_images': os.getenv('DOCLING_INCLUDE_IMAGES', 'true').lower() == 'true',
'images_scale': float(os.getenv('DOCLING_IMAGES_SCALE', '2.0')),
'ocr_engine': os.getenv('OCR_ENGINE', 'easyocr'),
'ocr_lang': os.getenv('OCR_LANG', 'en'),
'picture_description_area_threshold': float(os.getenv('DOCLING_PICTURE_DESCRIPTION_AREA_THRESHOLD', '0.05'))
}
# Determine the pipeline execution order: no_ocr → ocr → vlm
pipeline_order = []
if self.auto_docling_no_ocr:
pipeline_order.append('no_ocr')
if self.auto_docling_ocr:
pipeline_order.append('ocr')
if self.auto_docling_vlm:
pipeline_order.append('vlm')
if not pipeline_order:
logger.info(f"Phase 2: No pipelines enabled for file {file_id}")
return {
'file_id': file_id,
'triggered_pipelines': {},
'total_tasks': 0,
'sequential_order': [],
'message': 'No Phase 2 pipelines enabled'
}
logger.info(f"Phase 2: Sequential pipeline order for file {file_id}: {pipeline_order}")
logger.warning(f"trigger_phase2_pipelines is deprecated - use enqueue_sequential_docling_pipelines for new implementations")
# For backward compatibility, delegate to the new method
return self.enqueue_sequential_docling_pipelines(file_id, file_data)
def _start_single_pipeline(self, file_id: str, pipeline_type: str, base_config: Dict[str, Any],
bucket: str, file_path: str, cabinet_id: str, mime_type: str) -> Optional[Dict[str, Any]]:
"""Start a single pipeline of the specified type."""
if pipeline_type == 'no_ocr':
group_id = str(uuid.uuid4())
config = {
**base_config,
'do_ocr': False,
'force_ocr': False,
'pipeline': 'standard'
}
tasks = self._enqueue_pipeline(
file_id, 'no_ocr', group_id, config,
bucket, file_path, cabinet_id, mime_type,
by_page=self.docling_no_ocr_by_page
)
return {
'group_id': group_id,
'task_count': len(tasks),
'by_page': self.docling_no_ocr_by_page
}
elif pipeline_type == 'ocr':
group_id = str(uuid.uuid4())
config = {
**base_config,
'do_ocr': True,
'ocr_engine': os.getenv('OCR_ENGINE', 'easyocr'),
'force_ocr': False,
'pipeline': 'standard'
}
tasks = self._enqueue_pipeline(
file_id, 'ocr', group_id, config,
bucket, file_path, cabinet_id, mime_type,
by_page=self.docling_ocr_by_page
)
return {
'group_id': group_id,
'task_count': len(tasks),
'by_page': self.docling_ocr_by_page
}
elif pipeline_type == 'vlm':
group_id = str(uuid.uuid4())
config = {
**base_config,
'do_ocr': False,
'force_ocr': False,
'pipeline': 'vlm',
'vlm_pipeline_model': os.getenv('DOCLING_VLM_MODEL', 'smoldocling')
}
tasks = self._enqueue_pipeline(
file_id, 'vlm', group_id, config,
bucket, file_path, cabinet_id, mime_type,
by_page=self.docling_vlm_by_page
)
return {
'group_id': group_id,
'task_count': len(tasks),
'by_page': self.docling_vlm_by_page
}
else:
logger.error(f"Unknown pipeline type: {pipeline_type}")
return None
# continue_sequential_pipeline method removed - task dependencies now handle sequential execution
def _load_split_map(self, bucket: str, file_id: str) -> Optional[Dict[str, Any]]:
"""Load split map data for a file."""
try:
arts = self.client.supabase.table('document_artefacts') \
.select('id,type,rel_path') \
.eq('file_id', file_id).eq('type', 'split_map_json') \
.order('created_at', desc=True).limit(1).execute().data or []
if not arts:
return None
art = arts[0]
raw = self.storage.download_file(bucket, art['rel_path'])
import json as _json
return _json.loads(raw.decode('utf-8'))
except Exception:
return None
def _enqueue_pipeline(self, file_id: str, pipeline_type: str, group_id: str, config: Dict[str, Any],
bucket: str, file_path: str, cabinet_id: str, mime_type: str,
by_page: bool = False) -> List[str]:
"""Enqueue tasks for a specific pipeline (OCR/No-OCR/VLM)"""
task_ids = []
if by_page:
# Process each page individually, then group by split map sections
logger.info(f"Phase 2: Enqueueing {pipeline_type} pipeline by page for file {file_id}")
split_map = self._load_split_map(bucket, file_id)
if split_map:
entries = split_map.get('entries', [])
for section_idx, entry in enumerate(entries, 1):
start_page = int(entry.get('start_page', 1))
end_page = int(entry.get('end_page', start_page))
section_title = entry.get('title', f'Section {section_idx}')
if pipeline_type == 'vlm':
# VLM uses specialized page processing
section_task_id = enqueue_docling_task(
file_id=file_id,
task_type='vlm_section_page_bundle',
payload={
'section_idx': section_idx,
'start_page': start_page,
'end_page': end_page,
'section_title': section_title,
'vlm_group_id': group_id,
'vlm_model': config.get('vlm_pipeline_model', 'smoldocling'),
'base_config': config,
'total_sections': len(entries),
'producer': 'auto_phase2'
},
priority=TaskPriority.NORMAL,
timeout=3600
)
task_ids.append(section_task_id)
else:
# OCR/No-OCR by page processing (process each page in section individually)
for page_num in range(start_page, end_page + 1):
page_config = {
**config,
'page_range': [page_num, page_num]
}
# Create standardized bundle metadata
page_metadata = create_standard_metadata(
file_id=file_id,
pipeline="vlm" if pipeline_type == "vlm" else ("ocr" if config.get('do_ocr') else "no_ocr"),
processing_mode="individual_pages",
config=page_config,
group_id=group_id,
split_order=section_idx,
split_total=len(entries),
split_heading=section_title,
page_range=[page_num, page_num],
producer="auto_phase2"
)
# Add legacy fields for backward compatibility
artefact_extra = page_metadata.to_artefact_extra()
artefact_extra.update({
'section_idx': section_idx,
'section_title': section_title,
'page_number': page_num,
})
page_task_id = enqueue_docling_task(
file_id=file_id,
task_type='canonical_docling_json',
payload={
'bucket': bucket,
'file_path': file_path,
'cabinet_id': cabinet_id,
'mime_type': mime_type,
'config': page_config,
'artefact_extra': artefact_extra
},
priority=TaskPriority.NORMAL,
timeout=1800
)
task_ids.append(page_task_id)
else:
logger.warning(f"Phase 2: No split map found for by-page processing of file {file_id}")
return []
elif self.docling_use_split_map:
# Process by split map sections
logger.info(f"Phase 2: Enqueueing {pipeline_type} pipeline by split map sections for file {file_id}")
split_map = self._load_split_map(bucket, file_id)
if split_map:
entries = split_map.get('entries', [])
# Normalize and sort entries by start_page
normalized_entries = []
for entry in entries:
try:
start_page = int(entry.get('start_page', 1))
end_page = int(entry.get('end_page', start_page))
title = entry.get('title') or entry.get('label') or ''
if end_page < start_page:
end_page = start_page
normalized_entries.append({
'start': start_page,
'end': end_page,
'title': title
})
except Exception:
continue
normalized_entries.sort(key=lambda x: x['start'])
# Create tasks for each section
for i, entry in enumerate(normalized_entries, 1):
section_config = {
**config,
'page_range': [entry['start'], entry['end']]
}
# Create standardized bundle metadata for section
section_metadata = create_standard_metadata(
file_id=file_id,
pipeline="vlm" if pipeline_type == "vlm" else ("ocr" if config.get('do_ocr') else "no_ocr"),
processing_mode="split_sections",
config=section_config,
group_id=group_id,
split_order=i,
split_total=len(normalized_entries),
split_heading=entry['title'] or f'Section {i}',
page_range=[entry['start'], entry['end']],
producer="auto_phase2"
)
section_task_id = enqueue_docling_task(
file_id=file_id,
task_type='canonical_docling_json',
payload={
'bucket': bucket,
'file_path': file_path,
'cabinet_id': cabinet_id,
'mime_type': mime_type,
'config': section_config,
'artefact_extra': section_metadata.to_artefact_extra()
},
priority=TaskPriority.NORMAL,
timeout=3600
)
task_ids.append(section_task_id)
else:
logger.warning(f"Phase 2: No split map found for section-based processing of file {file_id}")
return []
else:
# Process whole document
logger.info(f"Phase 2: Enqueueing {pipeline_type} pipeline for whole document {file_id}")
# Create standardized bundle metadata for whole document
whole_doc_metadata = create_standard_metadata(
file_id=file_id,
pipeline="vlm" if pipeline_type == "vlm" else ("ocr" if config.get('do_ocr') else "no_ocr"),
processing_mode="whole_document",
config=config,
group_id=group_id,
producer="auto_phase2"
)
task_id = enqueue_docling_task(
file_id=file_id,
task_type='canonical_docling_json',
payload={
'bucket': bucket,
'file_path': file_path,
'cabinet_id': cabinet_id,
'mime_type': mime_type,
'config': config,
'artefact_extra': whole_doc_metadata.to_artefact_extra()
},
priority=TaskPriority.NORMAL,
timeout=7200
)
task_ids.append(task_id)
logger.info(f"Phase 2: Enqueued {len(task_ids)} tasks for {pipeline_type} pipeline")
return task_ids
def _enqueue_pipeline_with_deps(self, file_id: str, pipeline_type: str, group_id: str, config: Dict[str, Any],
bucket: str, file_path: str, cabinet_id: str, mime_type: str,
by_page: bool = False, depends_on: List[str] = None) -> List[str]:
"""Enqueue tasks for a specific pipeline with dependencies"""
if depends_on is None:
depends_on = []
task_ids = []
if by_page:
# Process each page individually, then group by split map sections
logger.info(f"Enqueueing {pipeline_type} pipeline by page for file {file_id} with {len(depends_on)} dependencies")
split_map = self._load_split_map(bucket, file_id)
if split_map:
entries = split_map.get('entries', [])
for section_idx, entry in enumerate(entries, 1):
start_page = int(entry.get('start_page', 1))
end_page = int(entry.get('end_page', start_page))
section_title = entry.get('title', f'Section {section_idx}')
if pipeline_type == 'vlm':
# VLM uses specialized page processing
section_task_id = enqueue_docling_task(
file_id=file_id,
task_type='vlm_section_page_bundle',
payload={
'section_idx': section_idx,
'start_page': start_page,
'end_page': end_page,
'section_title': section_title,
'vlm_group_id': group_id,
'vlm_model': config.get('vlm_pipeline_model', 'smoldocling'),
'base_config': config,
'total_sections': len(entries),
'producer': 'auto_phase2',
'depends_on': depends_on
},
priority=TaskPriority.NORMAL,
timeout=3600
)
task_ids.append(section_task_id)
else:
# OCR/No-OCR by page processing (process each page in section individually)
for page_num in range(start_page, end_page + 1):
page_config = {
**config,
'page_range': [page_num, page_num]
}
# Create standardized bundle metadata
page_metadata = create_standard_metadata(
file_id=file_id,
pipeline="vlm" if pipeline_type == "vlm" else ("ocr" if config.get('do_ocr') else "no_ocr"),
processing_mode="individual_pages",
config=page_config,
group_id=group_id,
split_order=section_idx,
split_total=len(entries),
split_heading=section_title,
page_range=[page_num, page_num],
producer="auto_phase2"
)
# Add legacy fields for backward compatibility
artefact_extra = page_metadata.to_artefact_extra()
artefact_extra.update({
'section_idx': section_idx,
'section_title': section_title,
'page_number': page_num,
})
page_task_id = enqueue_docling_task(
file_id=file_id,
task_type='canonical_docling_json',
payload={
'bucket': bucket,
'file_path': file_path,
'cabinet_id': cabinet_id,
'mime_type': mime_type,
'config': page_config,
'artefact_extra': artefact_extra,
'depends_on': depends_on
},
priority=TaskPriority.NORMAL,
timeout=1800
)
task_ids.append(page_task_id)
else:
logger.warning(f"No split map found for by-page processing of file {file_id}")
return []
elif self.docling_use_split_map:
# Process by split map sections
logger.info(f"Enqueueing {pipeline_type} pipeline by split map sections for file {file_id} with {len(depends_on)} dependencies")
split_map = self._load_split_map(bucket, file_id)
if split_map:
entries = split_map.get('entries', [])
# Normalize and sort entries by start_page
normalized_entries = []
for entry in entries:
try:
start_page = int(entry.get('start_page', 1))
end_page = int(entry.get('end_page', start_page))
title = entry.get('title') or entry.get('label') or ''
if end_page < start_page:
end_page = start_page
normalized_entries.append({
'start': start_page,
'end': end_page,
'title': title
})
except Exception:
continue
normalized_entries.sort(key=lambda x: x['start'])
# Create tasks for each section
for i, entry in enumerate(normalized_entries, 1):
section_config = {
**config,
'page_range': [entry['start'], entry['end']]
}
# Create standardized bundle metadata for section
section_metadata = create_standard_metadata(
file_id=file_id,
pipeline="vlm" if pipeline_type == "vlm" else ("ocr" if config.get('do_ocr') else "no_ocr"),
processing_mode="split_sections",
config=section_config,
group_id=group_id,
split_order=i,
split_total=len(normalized_entries),
split_heading=entry['title'] or f'Section {i}',
page_range=[entry['start'], entry['end']],
producer="auto_phase2"
)
section_task_id = enqueue_docling_task(
file_id=file_id,
task_type='canonical_docling_json',
payload={
'bucket': bucket,
'file_path': file_path,
'cabinet_id': cabinet_id,
'mime_type': mime_type,
'config': section_config,
'artefact_extra': section_metadata.to_artefact_extra(),
'depends_on': depends_on
},
priority=TaskPriority.NORMAL,
timeout=3600
)
task_ids.append(section_task_id)
else:
logger.warning(f"No split map found for section-based processing of file {file_id}")
return []
else:
# Process whole document
logger.info(f"Enqueueing {pipeline_type} pipeline for whole document {file_id} with {len(depends_on)} dependencies")
# Create standardized bundle metadata for whole document
whole_doc_metadata = create_standard_metadata(
file_id=file_id,
pipeline="vlm" if pipeline_type == "vlm" else ("ocr" if config.get('do_ocr') else "no_ocr"),
processing_mode="whole_document",
config=config,
group_id=group_id,
producer="auto_phase2"
)
task_id = enqueue_docling_task(
file_id=file_id,
task_type='canonical_docling_json',
payload={
'bucket': bucket,
'file_path': file_path,
'cabinet_id': cabinet_id,
'mime_type': mime_type,
'config': config,
'artefact_extra': whole_doc_metadata.to_artefact_extra(),
'depends_on': depends_on
},
priority=TaskPriority.NORMAL,
timeout=7200
)
task_ids.append(task_id)
logger.info(f"Enqueued {len(task_ids)} tasks for {pipeline_type} pipeline with dependencies")
return task_ids
# Global pipeline controller instance
_controller_instance = None
def get_pipeline_controller() -> DocumentPipelineController:
"""Get the global pipeline controller instance."""
global _controller_instance
if _controller_instance is None:
_controller_instance = DocumentPipelineController()
return _controller_instance