api/archive/auto_processing/task_processors.py
2025-11-14 14:47:19 +00:00

2532 lines
120 KiB
Python

"""
Task Processors for Document Processing Queue
This module contains the actual processing implementations for different
types of queued tasks (Tika, Docling, LLM, Split Map).
"""
import json
import zipfile
import io
import mimetypes
import requests
import tempfile
import uuid
from pathlib import Path
from typing import Dict, Any, Optional
import os
from modules.queue_system import DocumentProcessingQueue, QueueTask, ServiceType
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin
from modules.document_processor import DocumentProcessor
from modules.logger_tool import initialise_logger
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
class DocumentTaskProcessor(DocumentProcessingQueue):
"""
Extended queue with actual task processing implementations.
"""
def __init__(self, redis_url: str = None):
super().__init__(redis_url)
self.client = SupabaseServiceRoleClient()
self.storage = StorageAdmin()
self.doc_processor = DocumentProcessor()
# Service URLs
self.tika_url = os.getenv('TIKA_URL')
self.docling_url = os.getenv('DOCLING_URL') or os.getenv('NEOFS_DOCLING_URL')
self.llm_url = os.getenv('LLM_URL') # Local LLM endpoint
logger.info("Task processor initialized with service URLs")
def _process_task(self, task: QueueTask):
"""Process a task based on its service type."""
try:
# DEBUG: Log entry into processing
logger.info(f"🚀 PROCESS DEBUG: Starting _process_task for {task.id}")
# Audit dependency info (if any)
try:
deps = []
if isinstance(task.payload, dict):
deps = task.payload.get('depends_on') or []
if deps:
logger.info(f"Processing task {task.id}: {task.service.value}/{task.task_type} deps={deps}")
else:
logger.info(f"Processing task {task.id}: {task.service.value}/{task.task_type}")
except Exception:
logger.info(f"Processing task {task.id}: {task.service.value}/{task.task_type}")
# DEBUG: Log service routing
logger.info(f"🚀 PROCESS DEBUG: Routing task {task.id} to service {task.service}")
if task.service == ServiceType.TIKA:
result = self._process_tika_task(task)
elif task.service == ServiceType.DOCLING:
result = self._process_docling_task(task)
elif task.service == ServiceType.LLM:
result = self._process_llm_task(task)
elif task.service == ServiceType.SPLIT_MAP:
result = self._process_split_map_task(task)
elif task.service == ServiceType.DOCUMENT_ANALYSIS:
result = self.process_document_analysis_task(task)
elif task.service == ServiceType.PAGE_IMAGES:
result = self.process_page_images_task(task)
else:
raise ValueError(f"Unknown service type: {task.service}")
# DEBUG: Log successful completion
logger.info(f"✅ PROCESS DEBUG: Task {task.id} completed successfully, calling complete_task")
self.complete_task(task, result)
logger.info(f"✅ PROCESS DEBUG: Task {task.id} completion confirmed")
except Exception as e:
# DEBUG: Log detailed failure info
logger.error(f"🚨 PROCESS DEBUG: Task {task.id} processing failed: {e}")
logger.error(f"🚨 PROCESS DEBUG: Exception type: {type(e)}")
import traceback
logger.error(f"🚨 PROCESS DEBUG: Full traceback:\n{traceback.format_exc()}")
logger.info(f"🚨 PROCESS DEBUG: Calling fail_task for {task.id}")
self.fail_task(task, str(e))
logger.info(f"🚨 PROCESS DEBUG: fail_task completed for {task.id}")
def _process_tika_task(self, task: QueueTask) -> Dict[str, Any]:
"""Process Tika metadata extraction task."""
if not self.tika_url:
raise ValueError("TIKA_URL not configured")
payload = task.payload
file_id = task.file_id
bucket = payload['bucket']
file_path = payload['file_path']
cabinet_id = payload['cabinet_id']
mime_type = payload.get('mime_type', 'application/octet-stream')
# Download file
logger.debug(f"Downloading file for Tika processing: {bucket}/{file_path}")
file_bytes = self.storage.download_file(bucket, file_path)
# Call Tika
headers = {'Accept': 'application/json', 'Content-Type': mime_type}
timeout = task.timeout
response = requests.put(
f"{self.tika_url.rstrip('/')}/meta",
data=file_bytes,
headers=headers,
timeout=timeout
)
response.raise_for_status()
tika_json = response.json()
# Store result as artefact
artefact_id = str(uuid.uuid4())
rel_path = f"{cabinet_id}/{file_id}/{artefact_id}/tika.json"
self.storage.upload_file(
bucket,
rel_path,
json.dumps(tika_json, ensure_ascii=False).encode('utf-8'),
'application/json',
upsert=True
)
# Create artefact record with enhanced UI metadata
artefact_data = {
'id': artefact_id,
'file_id': file_id,
'type': 'tika_json',
'rel_path': rel_path,
'extra': {
'processing_time': response.elapsed.total_seconds(),
'display_name': 'Document Metadata',
'bundle_label': 'Tika Analysis',
'section_title': 'Document Metadata',
'bundle_type': 'tika_json',
'processing_mode': 'metadata_extraction',
'pipeline': 'tika_analysis',
'is_metadata': True,
'ui_category': 'raw_data',
'ui_order': 3,
'description': 'Raw document metadata and properties extracted by Apache Tika',
'viewer_type': 'json'
},
'status': 'completed'
}
self.client.supabase.table('document_artefacts').insert(artefact_data).execute()
logger.info(f"Tika processing completed for file {file_id}")
return {
'artefact_id': artefact_id,
'rel_path': rel_path,
'processing_time': response.elapsed.total_seconds()
}
def _process_docling_task(self, task: QueueTask) -> Dict[str, Any]:
"""Process Docling document analysis task.
Also allows routing of related task types so that page images and
enhanced structure analysis can run under the stable docling service
umbrella when SERVICE dispatch for new types is problematic.
"""
# Soft-route additional task types through this handler
if task.task_type in ("document_structure_analysis", "document_analysis"):
return self.process_document_analysis_task(task)
if task.task_type in ("generate_page_images", "page_images"):
return self.process_page_images_task(task)
if task.task_type in ("vlm_section_page_bundle",):
return self.process_vlm_section_page_bundle_task(task)
if task.task_type in ("vlm_section_bundle_collector",):
return self.process_vlm_section_bundle_collector_task(task)
# New unified bundle architecture handlers
if task.task_type in ("docling_bundle",):
return self.process_docling_bundle_task(task)
if task.task_type in ("docling_bundle_split",):
return self.process_docling_bundle_split_task(task)
# phase2_coordinator task type removed - pipelines now enqueued directly from split_map task
if not self.docling_url:
raise ValueError("DOCLING_URL not configured")
payload = task.payload
file_id = task.file_id
bucket = payload['bucket']
file_path = payload['file_path']
cabinet_id = payload['cabinet_id']
task_config = payload.get('config', {})
# Download file
logger.debug(f"Downloading file for Docling processing: {bucket}/{file_path}")
file_bytes = self.storage.download_file(bucket, file_path)
# Prepare Docling request
docling_api_key = os.getenv('DOCLING_API_KEY')
# Accept any content type so zip/binary responses are allowed
headers = {'Accept': '*/*'}
if docling_api_key:
headers['X-Api-Key'] = docling_api_key
# Determine to_formats. For canonical docling we will request a ZIP bundle.
to_formats_val = task_config.get('to_formats', 'json')
to_formats_list = to_formats_val if isinstance(to_formats_val, list) else [to_formats_val]
is_canonical = str(task.task_type).startswith('canonical_docling')
target_type = task_config.get('target_type', 'zip' if is_canonical else 'inbody')
# Build form data from config (override for canonical)
form_data = [
('target_type', target_type),
('do_ocr', str(task_config.get('do_ocr', False)).lower()),
('force_ocr', str(task_config.get('force_ocr', False)).lower()),
('image_export_mode', 'referenced' if is_canonical else task_config.get('image_export_mode', 'embedded')),
('ocr_engine', task_config.get('ocr_engine', 'easyocr')),
('ocr_lang', task_config.get('ocr_lang', 'en')),
('pdf_backend', task_config.get('pdf_backend', 'dlparse_v4')),
('table_mode', task_config.get('table_mode', 'fast')),
('do_formula_enrichment', str(task_config.get('do_formula_enrichment', False)).lower()),
('do_code_enrichment', str(task_config.get('do_code_enrichment', False)).lower()),
('pipeline', task_config.get('pipeline', 'standard'))
]
# Optional extra flags forwarded when present
if 'table_cell_matching' in task_config:
form_data.append(('table_cell_matching', str(task_config.get('table_cell_matching')).lower()))
if 'do_picture_classification' in task_config:
form_data.append(('do_picture_classification', str(task_config.get('do_picture_classification')).lower()))
if 'do_picture_description' in task_config:
form_data.append(('do_picture_description', str(task_config.get('do_picture_description')).lower()))
if task_config.get('picture_description_prompt'):
form_data.append(('picture_description_prompt', task_config.get('picture_description_prompt')))
# picture_description_api and vlm_pipeline_model_api must be JSON per Docling OpenAPI
if task_config.get('picture_description_api') is not None:
v = task_config.get('picture_description_api')
if isinstance(v, (dict, list)):
form_data.append(('picture_description_api', json.dumps(v)))
elif isinstance(v, str) and v.strip().startswith(('{', '[')):
form_data.append(('picture_description_api', v))
# else: omit to avoid validation error
if task_config.get('vlm_pipeline_model'):
form_data.append(('vlm_pipeline_model', task_config.get('vlm_pipeline_model')))
if task_config.get('vlm_pipeline_model_api') is not None:
v = task_config.get('vlm_pipeline_model_api')
if isinstance(v, (dict, list)):
form_data.append(('vlm_pipeline_model_api', json.dumps(v)))
elif isinstance(v, str) and v.strip().startswith(('{', '[')):
form_data.append(('vlm_pipeline_model_api', v))
# else: omit
if is_canonical and ('md' in to_formats_list):
form_data.append(('md_page_break_placeholder', task_config.get('md_page_break_placeholder', '\n\n<!-- page-break -->\n\n')))
# Append to_formats as repeated fields (filter unsupported split pages)
to_formats_list = [f for f in to_formats_list if f != 'html_split_page']
for fmt in to_formats_list:
form_data.append(('to_formats', fmt))
# Handle page range with clamping and min/max correction
page_range = task_config.get('page_range', [1, 999999])
if isinstance(page_range, list) and len(page_range) >= 2:
def _to_int_safe(v, default):
try:
return int(v)
except Exception:
return default
start_pg = _to_int_safe(page_range[0], 1)
end_pg = _to_int_safe(page_range[1], 999999)
if start_pg < 1:
start_pg = 1
if end_pg < start_pg:
end_pg = start_pg
# Clamp for frontmatter-like tasks to actual page count if possible
if task.task_type in ('docling_frontmatter_json', 'document_structure_analysis'):
try:
import fitz # PyMuPDF
doc = fitz.open(stream=file_bytes, filetype='pdf')
pc = int(doc.page_count)
doc.close()
if pc > 0:
end_pg = min(end_pg, pc)
start_pg = max(1, min(start_pg, pc))
if end_pg < start_pg:
end_pg = start_pg
except Exception:
pass
form_data.append(('page_range', str(start_pg)))
form_data.append(('page_range', str(end_pg)))
files = [('files', ('file', file_bytes, payload.get('mime_type', 'application/pdf')))]
# Make request
response = requests.post(
f"{self.docling_url.rstrip('/')}/v1/convert/file",
files=files,
data=form_data,
headers=headers,
timeout=task.timeout
)
response.raise_for_status()
content_type = (response.headers.get('Content-Type') or '').lower()
is_zip_resp = ('zip' in content_type) or (response.content[:2] == b'PK')
if is_zip_resp and is_canonical:
# Unpack zip, store all files and a manifest
artefact_id = str(uuid.uuid4())
base_dir = f"{cabinet_id}/{file_id}/{artefact_id}"
archive_path = f"{base_dir}/bundle.zip"
# Save original archive
self.storage.upload_file(bucket, archive_path, response.content, 'application/zip', upsert=True)
zf = zipfile.ZipFile(io.BytesIO(response.content))
entries = []
md_full_path = None
html_full_path = None
text_full_path = None
json_full_path = None
images_list = []
md_data_bytes: bytes | None = None
for zi in zf.infolist():
if zi.is_dir():
continue
name = zi.filename.lstrip('/').replace('..', '')
data = zf.read(zi)
ctype = mimetypes.guess_type(name)[0] or 'application/octet-stream'
rel = f"{base_dir}/{name}"
self.storage.upload_file(bucket, rel, data, ctype, upsert=True)
entries.append({
'name': name,
'path': rel,
'size': zi.file_size,
'content_type': ctype
})
# Detect known outputs
lower = name.lower()
if lower.endswith('.md') and md_full_path is None:
md_full_path = rel
md_data_bytes = data
elif lower.endswith('.html') and html_full_path is None:
html_full_path = rel
elif lower.endswith('.txt') and text_full_path is None:
text_full_path = rel
elif lower.endswith('.json') and json_full_path is None:
json_full_path = rel
if ctype.startswith('image/'):
images_list.append({'name': name, 'path': rel, 'content_type': ctype, 'size': zi.file_size})
manifest = {
'file_id': file_id,
'artefact_id': artefact_id,
'to_formats': to_formats_list,
'image_export_mode': 'referenced',
'entries': entries,
'archive_path': archive_path,
'markdown_full': md_full_path,
'html_full': html_full_path,
'text_full': text_full_path,
'json_full': json_full_path,
'images': images_list,
'bucket': bucket
}
# Create markdown pages by splitting on placeholder if available
if md_data_bytes is not None:
try:
md_text = md_data_bytes.decode('utf-8', errors='replace')
sep = task_config.get('md_page_break_placeholder', '\n\n<!-- page-break -->\n\n')
parts = md_text.split(sep)
if len(parts) > 1:
pages_dir = f"{base_dir}/md_pages"
pages = []
for i, part in enumerate(parts, start=1):
pth = f"{pages_dir}/page-{i:04d}.md"
self.storage.upload_file(bucket, pth, part.encode('utf-8'), 'text/markdown', upsert=True)
pages.append({'page': i, 'path': pth})
manifest['markdown_pages'] = pages
except Exception as e:
logger.warning(f"Failed creating markdown_pages for file {file_id}: {e}")
manifest_path = f"{base_dir}/manifest.json"
self.storage.upload_file(bucket, manifest_path, json.dumps(manifest, ensure_ascii=False).encode('utf-8'), 'application/json', upsert=True)
# Create artefact row pointing to directory with manifest, including grouping extras for split packs
artefact_extra = payload.get('artefact_extra') if isinstance(payload, dict) else None
# Determine artefact type by pipeline (standard vs vlm)
pipeline_mode = (task_config.get('pipeline') or 'standard').lower()
artefact_type_final = 'docling_vlm' if pipeline_mode == 'vlm' else 'docling_standard'
group_pack_type = payload.get('group_pack_type') if isinstance(payload, dict) else None
# propagate group_id if provided (set by caller for multi-part packs)
group_id = (artefact_extra or {}).get('group_id')
# Compute a settings fingerprint for grouping (exclude page_range)
try:
import hashlib, json as _json
cfg_for_hash = dict(task_config)
cfg_for_hash.pop('page_range', None)
settings_fingerprint = hashlib.sha1(_json.dumps(cfg_for_hash, sort_keys=True, ensure_ascii=False).encode('utf-8')).hexdigest()
except Exception:
settings_fingerprint = None
self.client.supabase.table('document_artefacts').insert({
'id': artefact_id,
'file_id': file_id,
'type': artefact_type_final,
'rel_path': base_dir,
'extra': {
'manifest': manifest_path,
'processing_time': response.elapsed.total_seconds(),
'config': task_config,
'group_pack_type': group_pack_type or (artefact_extra or {}).get('group_pack_type'),
'group_id': group_id,
'pipeline': pipeline_mode,
'settings_fingerprint': settings_fingerprint,
**(artefact_extra or {})
},
'status': 'completed'
}).execute()
logger.info(f"Canonical docling bundle stored for file {file_id} with {len(entries)} files")
return {
'artefact_id': artefact_id,
'files_count': len(entries)
}
if 'application/json' in content_type or content_type.endswith('+json'):
docling_json = response.json()
artefact_id = str(uuid.uuid4())
artefact_type = task.task_type
rel_path = f"{cabinet_id}/{file_id}/{artefact_id}/{artefact_type.replace('_json', '.json')}"
self.storage.upload_file(
bucket,
rel_path,
json.dumps(docling_json, ensure_ascii=False).encode('utf-8'),
'application/json',
upsert=True
)
artefact_data = {
'id': artefact_id,
'file_id': file_id,
'type': artefact_type,
'rel_path': rel_path,
'extra': {
'processing_time': response.elapsed.total_seconds(),
'config': task_config,
**({} if 'artefact_extra' not in payload else payload['artefact_extra'])
},
'status': 'completed'
}
self.client.supabase.table('document_artefacts').insert(artefact_data).execute()
else:
# Fallback: store raw output if server didn't return JSON (unexpected for inbody)
artefact_id = str(uuid.uuid4())
ext = ('html' if 'html' in content_type else ('md' if 'markdown' in content_type else ('txt' if 'text/plain' in content_type else 'bin')))
artefact_type = f'docling_output_{ext}'
rel_path = f"{cabinet_id}/{file_id}/{artefact_id}/docling_output.{ext}"
self.storage.upload_file(
bucket,
rel_path,
response.content,
'application/zip' if ext == 'zip' else (content_type or 'application/octet-stream'),
upsert=True
)
artefact_data = {
'id': artefact_id,
'file_id': file_id,
'type': artefact_type,
'rel_path': rel_path,
'extra': {
'processing_time': response.elapsed.total_seconds(),
'config': task_config,
'to_formats': to_formats_list,
'content_type': content_type,
**({} if 'artefact_extra' not in payload else payload['artefact_extra'])
},
'status': 'completed'
}
self.client.supabase.table('document_artefacts').insert(artefact_data).execute()
# When we get canonical Docling JSON, also split out component contents into separate artefacts
try:
if 'application/json' in content_type or content_type.endswith('+json'):
self._store_docling_component_artefacts(
file_id=file_id,
cabinet_id=cabinet_id,
bucket=bucket,
docling_json=docling_json,
task_config=task_config,
artefact_extra=payload.get('artefact_extra') if isinstance(payload, dict) else None
)
except Exception as split_e:
logger.warning(f"Storing component artefacts failed for file {file_id}: {split_e}")
# Handle optional frontpage image extraction
if task.task_type == 'docling_frontmatter_json':
try:
self._extract_frontpage_image(docling_json, file_id, cabinet_id, bucket)
except Exception as e:
logger.warning(f"Frontpage image extraction failed for file {file_id}: {e}")
logger.info(f"Docling processing completed for file {file_id}")
# Pipeline dependencies now handle sequential execution automatically
return {
'artefact_id': artefact_id,
'rel_path': rel_path,
'processing_time': response.elapsed.total_seconds()
}
def _extract_frontpage_image(self, docling_json: Dict[str, Any], file_id: str,
cabinet_id: str, bucket: str):
"""Extract and store frontpage image from Docling JSON."""
import base64
# Look for frontpage image in various locations
cover_b64 = None
for key in ['frontpage', 'cover']:
if key in docling_json and 'image_base64' in docling_json[key]:
cover_b64 = docling_json[key]['image_base64']
break
if not cover_b64:
return
# Decode and store image
artefact_id = str(uuid.uuid4())
img_bytes = base64.b64decode(cover_b64)
rel_path = f"{cabinet_id}/{file_id}/{artefact_id}/frontpage.png"
self.storage.upload_file(bucket, rel_path, img_bytes, 'image/png', upsert=True)
# Create artefact record
artefact_data = {
'id': artefact_id,
'file_id': file_id,
'type': 'docling_frontpage_image',
'rel_path': rel_path,
'extra': {'extracted_from': 'docling_frontmatter'},
'status': 'completed'
}
self.client.supabase.table('document_artefacts').insert(artefact_data).execute()
logger.debug(f"Frontpage image extracted for file {file_id}")
def _store_docling_component_artefacts(self, *, file_id: str, cabinet_id: str, bucket: str, docling_json: Dict[str, Any], task_config: Dict[str, Any], artefact_extra: Optional[Dict[str, Any]] = None) -> None:
"""Create artefacts for component contents from a canonical Docling JSON.
Stores md_content, html_content, text_content, doctags_content and json_content
if present, as separate artefacts and files alongside the canonical JSON.
"""
doc = docling_json.get('document') or {}
components = [
('md_content', 'docling_md', 'docling.md', 'text/markdown', lambda v: v if isinstance(v, str) else ''),
('html_content', 'docling_html', 'docling.html', 'text/html', lambda v: v if isinstance(v, str) else ''),
('text_content', 'docling_text', 'docling.txt', 'text/plain', lambda v: v if isinstance(v, str) else ''),
('doctags_content', 'docling_doctags', 'docling.doctags.xml', 'application/xml', lambda v: v if isinstance(v, str) else ''),
('json_content', 'docling_json', 'docling.json', 'application/json', lambda v: json.dumps(v or {}, ensure_ascii=False)),
]
for key, art_type, filename, mime, to_bytes in components:
if key not in doc or doc.get(key) in (None, ''):
continue
try:
artefact_id = str(uuid.uuid4())
rel_path = f"{cabinet_id}/{file_id}/{artefact_id}/{filename}"
data_bytes = to_bytes(doc.get(key))
if isinstance(data_bytes, str):
data_bytes = data_bytes.encode('utf-8')
self.storage.upload_file(bucket, rel_path, data_bytes, mime, upsert=True)
extra = {'source': 'canonical_docling_json', 'component_key': key, 'config': task_config}
if artefact_extra:
extra.update(artefact_extra)
self.client.supabase.table('document_artefacts').insert({
'id': artefact_id,
'file_id': file_id,
'type': art_type,
'rel_path': rel_path,
'extra': extra,
'status': 'completed'
}).execute()
except Exception as e:
logger.warning(f"Failed to store component '{key}' for file {file_id}: {e}")
def _process_llm_task(self, task: QueueTask) -> Dict[str, Any]:
"""Process LLM analysis task (document classification, etc.)."""
if not self.llm_url:
raise ValueError("LLM_URL not configured")
payload = task.payload
file_id = task.file_id
prompt = payload['prompt']
context = payload.get('context', '')
model = payload.get('model', 'default')
# Prepare LLM request
llm_request = {
'model': model,
'prompt': prompt,
'context': context,
'max_tokens': payload.get('max_tokens', 1000),
'temperature': payload.get('temperature', 0.1)
}
# Call local LLM
response = requests.post(
f"{self.llm_url.rstrip('/')}/generate",
json=llm_request,
headers={'Content-Type': 'application/json'},
timeout=task.timeout
)
response.raise_for_status()
llm_result = response.json()
# Store result (optional - depends on use case)
if payload.get('store_result', False):
bucket = payload['bucket']
cabinet_id = payload['cabinet_id']
artefact_id = str(uuid.uuid4())
rel_path = f"{cabinet_id}/{file_id}/{artefact_id}/llm_{task.task_type}.json"
self.storage.upload_file(
bucket,
rel_path,
json.dumps(llm_result, ensure_ascii=False).encode('utf-8'),
'application/json',
upsert=True
)
# Create artefact record
artefact_data = {
'id': artefact_id,
'file_id': file_id,
'type': f'llm_{task.task_type}',
'rel_path': rel_path,
'extra': {
'model': model,
'task_type': task.task_type
},
'status': 'completed'
}
self.client.supabase.table('document_artefacts').insert(artefact_data).execute()
logger.info(f"LLM processing completed for file {file_id}")
return llm_result
def _process_split_map_task(self, task: QueueTask) -> Dict[str, Any]:
"""Process split map generation task."""
from routers.database.files.split_map import create_split_map_for_file
from routers.database.files.files import enqueue_canonical_docling
file_id = task.file_id
# Generate split map
split_map = create_split_map_for_file(file_id)
logger.info(f"Split map generation completed for file {file_id}")
# NEW BUNDLE ARCHITECTURE: Direct pipeline enqueueing
# Split map completion now directly triggers bundle task creation
logger.info(f"NEW ARCHITECTURE: Enqueueing sequential docling bundle pipelines for file {file_id}")
try:
# Get file information for pipeline enqueueing
file_result = self.client.supabase.table('files').select('*').eq('id', file_id).single().execute()
if not file_result.data:
logger.error(f"Could not find file {file_id} for pipeline enqueueing")
return {
'method': split_map['method'],
'confidence': split_map['confidence'],
'entries_count': len(split_map['entries']),
'pipeline_error': 'File not found for pipeline enqueueing'
}
file_row = file_result.data
bucket = file_row['bucket']
cabinet_id = file_row['cabinet_id']
storage_path = file_row['path']
original_mime = file_row.get('mime_type', 'application/pdf')
# Prefer converted PDF if available (matches existing pattern)
try:
arts = self.client.supabase.table('document_artefacts').select('type,rel_path').eq('file_id', file_id).order('created_at', desc=True).execute().data or []
pdf_art = next((a for a in arts if a.get('type') == 'document_pdf'), None)
processing_path = pdf_art['rel_path'] if pdf_art else storage_path
processing_mime = 'application/pdf' if pdf_art else original_mime
except Exception:
processing_path = storage_path
processing_mime = original_mime
# Prepare file data for pipeline controller
file_data = {
'bucket': bucket,
'file_path': processing_path,
'cabinet_id': cabinet_id,
'mime_type': processing_mime
}
# Import and use pipeline controller to enqueue sequential pipelines
from modules.pipeline_controller import get_pipeline_controller
controller = get_pipeline_controller()
pipeline_result = controller.enqueue_sequential_docling_pipelines(file_id, file_data)
logger.info(f"Successfully enqueued {pipeline_result['total_tasks']} tasks across "
f"{len(pipeline_result['enqueued_pipelines'])} pipelines for file {file_id}")
logger.info(f"Pipeline execution order: {pipeline_result['sequential_order']}")
return {
'method': split_map['method'],
'confidence': split_map['confidence'],
'entries_count': len(split_map['entries']),
'enqueued_pipelines': pipeline_result['enqueued_pipelines'],
'total_pipeline_tasks': pipeline_result['total_tasks'],
'pipeline_order': pipeline_result['sequential_order']
}
except Exception as e:
logger.error(f"Failed to enqueue sequential pipelines for file {file_id}: {e}")
return {
'method': split_map['method'],
'confidence': split_map['confidence'],
'entries_count': len(split_map['entries']),
'pipeline_error': str(e)
}
# Split map processing completed successfully
return {
'method': split_map['method'],
'confidence': split_map['confidence'],
'entries_count': len(split_map['entries'])
}
def _enqueue_vlm_page_processing(self, file_id: str, threshold: int, vlm_group_id: str, vlm_model: str, base_config: dict):
"""Enqueue VLM processing for individual pages within split map sections."""
from routers.database.files.files import _load_split_map
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin
try:
client = SupabaseServiceRoleClient()
storage = StorageAdmin()
# Get file info
fr = client.supabase.table('files').select('*').eq('id', file_id).single().execute()
if not fr.data:
logger.error(f"File {file_id} not found for VLM page processing")
return
file_row = fr.data
bucket = file_row['bucket']
# Load split map
split_map = _load_split_map(client, storage, bucket, file_id)
if not split_map:
logger.warning(f"No split map found for VLM page processing file {file_id}")
return
entries = split_map.get('entries', [])
if not entries:
logger.warning(f"Empty split map entries for VLM page processing file {file_id}")
return
logger.info(f"[auto-canonical] VLM page processing: found {len(entries)} sections for file {file_id}")
# Process each section with page-by-page VLM
for section_idx, entry in enumerate(entries, 1):
try:
start_page = int(entry.get('start_page', 1))
end_page = int(entry.get('end_page', start_page))
section_title = entry.get('title', f'Section {section_idx}')
logger.info(f"[auto-canonical] VLM page processing section {section_idx}: '{section_title}' pages {start_page}-{end_page}")
# Create section-level bundle manifest task
self._enqueue_vlm_section_page_bundle(
file_id, section_idx, start_page, end_page, section_title,
vlm_group_id, vlm_model, base_config, len(entries)
)
except Exception as section_e:
logger.warning(f"Failed to process VLM section {section_idx} for file {file_id}: {section_e}")
continue
except Exception as e:
logger.error(f"VLM page processing setup failed for file {file_id}: {e}")
def _enqueue_vlm_section_page_bundle(self, file_id: str, section_idx: int, start_page: int, end_page: int,
section_title: str, vlm_group_id: str, vlm_model: str,
base_config: dict, total_sections: int):
"""Enqueue VLM processing for individual pages within a section, then bundle them."""
from modules.queue_system import enqueue_docling_task, TaskPriority
try:
# Create a unique task to handle page-by-page processing for this section
section_task_id = enqueue_docling_task(
file_id=file_id,
task_type='vlm_section_page_bundle',
payload={
'section_idx': section_idx,
'start_page': start_page,
'end_page': end_page,
'section_title': section_title,
'vlm_group_id': vlm_group_id,
'vlm_model': vlm_model,
'base_config': base_config,
'total_sections': total_sections,
'producer': 'auto_split'
},
priority=TaskPriority.NORMAL,
timeout=3600 # 1 hour for page-by-page processing
)
logger.info(f"[auto-canonical] VLM section page bundle task {section_task_id} for section {section_idx} of file {file_id}")
except Exception as e:
logger.error(f"Failed to enqueue VLM section page bundle for section {section_idx} file {file_id}: {e}")
def process_document_analysis_task(self, task: QueueTask) -> Dict[str, Any]:
"""Process document structure analysis task"""
file_id = task.file_id
payload = task.payload
logger.info(f"Processing document analysis task for file {file_id}")
try:
# Load file from storage
bucket = payload['bucket']
file_path = payload['file_path']
cabinet_id = payload['cabinet_id']
file_bytes = self.storage.download_file(bucket, file_path)
# Load existing artefacts if available
client = SupabaseServiceRoleClient()
artefacts = client.supabase.table('document_artefacts').select('*').eq('file_id', file_id).execute()
tika_json = None
docling_json = None
for art in artefacts.data:
if art['type'] == 'tika_json' and art['status'] == 'completed':
try:
tika_data = self.storage.download_file(bucket, art['rel_path'])
tika_json = json.loads(tika_data.decode('utf-8'))
except Exception as e:
logger.warning(f"Failed to load Tika JSON for analysis: {e}")
elif art['type'] in ['docling_frontmatter_json', 'docling_noocr_json'] and art['status'] == 'completed':
try:
docling_data = self.storage.download_file(bucket, art['rel_path'])
docling_json = json.loads(docling_data.decode('utf-8'))
break # Use first available Docling result
except Exception as e:
logger.warning(f"Failed to load Docling JSON for analysis: {e}")
# Import here to avoid circular imports
from modules.document_analysis import create_document_outline_hierarchy_artefact
# Create document analysis
analysis_data = create_document_outline_hierarchy_artefact(
file_id=file_id,
pdf_bytes=file_bytes,
tika_json=tika_json,
docling_json=docling_json
)
# Store analysis as artefact (insert row first, then upload file)
artefact_id = analysis_data.get('artefact_id') or str(uuid.uuid4())
analysis_data['artefact_id'] = artefact_id
rel_path = f"{cabinet_id}/{file_id}/{artefact_id}/document_outline_hierarchy.json"
# Insert row first to avoid orphaned files if DB insert fails
# Insert artefact record with processing status
sections_count = len(analysis_data.get('sections', []) or [])
metadata = analysis_data.get('metadata') or {}
analysis_methods = metadata.get('analysis_methods')
self.client.supabase.table('document_artefacts').insert({
'id': artefact_id,
'file_id': file_id,
'type': 'document_outline_hierarchy',
'rel_path': rel_path,
'extra': {
'sections_count': sections_count,
'analysis_methods': analysis_methods
},
'status': 'processing'
}).execute()
# Now upload the file
analysis_json = json.dumps(analysis_data, ensure_ascii=False)
self.storage.upload_file(bucket, rel_path, analysis_json.encode('utf-8'), 'application/json', upsert=True)
# Mark artefact as completed
self.client.supabase.table('document_artefacts').update({
'status': 'completed'
}).eq('id', artefact_id).execute()
logger.info(f"Document analysis completed for file {file_id} (sections={sections_count})")
return {
'sections_count': sections_count
}
except Exception as e:
logger.error(f"Document analysis failed for file {file_id}: {e}")
raise
def process_page_images_task(self, task: QueueTask) -> Dict[str, Any]:
"""Process page images generation task"""
file_id = task.file_id
payload = task.payload
logger.info(f"Processing page images task for file {file_id}")
try:
# Load file from storage
bucket = payload['bucket']
file_path = payload['file_path']
cabinet_id = payload['cabinet_id']
file_bytes = self.storage.download_file(bucket, file_path)
# Import here to avoid circular imports
from modules.page_image_generator import create_page_images_artefact
# Generate page images
images_data = create_page_images_artefact(
file_id=file_id,
cabinet_id=cabinet_id,
pdf_bytes=file_bytes
)
artefact_id = images_data['artefact_id']
# Include bucket in manifest for client-side signed URL generation
images_data['bucket'] = bucket
# Upload all page images to storage
for page_info in images_data['page_images']:
# Upload full image
full_path = page_info['full_image_path']
full_data = page_info.pop('full_image_data') # Remove from JSON
self.storage.upload_file(bucket, full_path, full_data, 'image/png', upsert=True)
# Upload thumbnail
thumb_path = page_info['thumbnail_path']
thumb_data = page_info.pop('thumbnail_data') # Remove from JSON
self.storage.upload_file(bucket, thumb_path, thumb_data, 'image/webp', upsert=True)
# Store images metadata manifest under the artefact directory
artefact_dir = f"{cabinet_id}/{file_id}/{artefact_id}"
manifest_rel_path = f"{artefact_dir}/page_images.json"
images_json = json.dumps(images_data, ensure_ascii=False)
self.storage.upload_file(bucket, manifest_rel_path, images_json.encode('utf-8'), 'application/json', upsert=True)
# Insert artefact record
client = SupabaseServiceRoleClient()
client.supabase.table('document_artefacts').insert({
'id': artefact_id,
'file_id': file_id,
'type': 'page_images',
# Store the directory prefix as rel_path for hybrid approach
'rel_path': artefact_dir,
'extra': {
'page_count': images_data['page_count'],
'total_full_images': images_data['storage_info']['total_full_images'],
'total_thumbnails': images_data['storage_info']['total_thumbnails'],
'estimated_storage_mb': images_data['storage_info']['estimated_storage_mb'],
'manifest': manifest_rel_path
},
'status': 'completed'
}).execute()
logger.info(f"Page images generation completed for file {file_id}")
return {
'page_count': images_data['page_count'],
'estimated_storage_mb': images_data['storage_info']['estimated_storage_mb']
}
except Exception as e:
logger.error(f"Page images generation failed for file {file_id}: {e}")
raise
def process_comparison_analysis_task(self, task: QueueTask) -> Dict[str, Any]:
"""Process comparison analysis between no-OCR and OCR docling results."""
file_id = task.file_id
payload = task.payload
logger.info(f"Processing comparison analysis task for file {file_id}")
try:
no_ocr_group_id = payload.get('no_ocr_group_id')
ocr_group_id = payload.get('ocr_group_id')
comparison_type = payload.get('comparison_type', 'noocr_vs_ocr')
initial_delay = payload.get('initial_delay_seconds', 0)
# If this is the first execution and we have an initial delay, sleep briefly
if initial_delay > 0:
import time
logger.info(f"Comparison analysis: applying initial delay of {min(initial_delay, 60)} seconds for file {file_id}")
time.sleep(min(initial_delay, 60)) # Max 1 minute delay per attempt
logger.info(f"Comparison analysis: delay complete for file {file_id}")
if not no_ocr_group_id or not ocr_group_id:
raise ValueError("Missing group_id parameters for comparison")
client = SupabaseServiceRoleClient()
# Find file info
fr = client.supabase.table('files').select('*').eq('id', file_id).single().execute()
if not fr.data:
raise ValueError(f"File {file_id} not found")
file_row = fr.data
bucket = file_row['bucket']
cabinet_id = file_row['cabinet_id']
# Find artefacts for both groups
artefacts = client.supabase.table('document_artefacts').select('*').eq('file_id', file_id).execute()
arts = artefacts.data or []
# Filter artefacts by group_id and type, including status
no_ocr_arts = [a for a in arts if ((a.get('extra') or {}).get('group_id') == no_ocr_group_id and
a.get('type') == 'docling_standard' and
a.get('status') == 'completed')]
ocr_arts = [a for a in arts if ((a.get('extra') or {}).get('group_id') == ocr_group_id and
a.get('type') == 'docling_standard' and
a.get('status') == 'completed')]
# Also check pending/processing artefacts to understand timing better
no_ocr_pending = [a for a in arts if ((a.get('extra') or {}).get('group_id') == no_ocr_group_id and
a.get('type') == 'docling_standard' and
a.get('status') in ('processing', 'pending'))]
ocr_pending = [a for a in arts if ((a.get('extra') or {}).get('group_id') == ocr_group_id and
a.get('type') == 'docling_standard' and
a.get('status') in ('processing', 'pending'))]
# Determine expected total parts from split_total metadata (if available)
expected_parts = None
if no_ocr_arts:
expected_parts = (no_ocr_arts[0].get('extra') or {}).get('split_total')
elif ocr_arts:
expected_parts = (ocr_arts[0].get('extra') or {}).get('split_total')
elif no_ocr_pending:
expected_parts = (no_ocr_pending[0].get('extra') or {}).get('split_total')
elif ocr_pending:
expected_parts = (ocr_pending[0].get('extra') or {}).get('split_total')
logger.info(f"Comparison analysis: found {len(no_ocr_arts)} completed no-OCR artefacts ({len(no_ocr_pending)} pending), {len(ocr_arts)} completed OCR artefacts ({len(ocr_pending)} pending), expected_parts={expected_parts}")
# Enhanced validation with progress-aware retry logic
if expected_parts is not None:
# We know how many parts to expect, so wait for all of them
total_no_ocr = len(no_ocr_arts) + len(no_ocr_pending)
total_ocr = len(ocr_arts) + len(ocr_pending)
# Calculate completion percentages
no_ocr_completion = len(no_ocr_arts) / expected_parts * 100
ocr_completion = len(ocr_arts) / expected_parts * 100
# Check if we're making progress (store in task metadata for persistence)
progress_key = f"comparison_progress_{file_id}"
current_progress = {
'no_ocr_completed': len(no_ocr_arts),
'ocr_completed': len(ocr_arts),
'no_ocr_pending': len(no_ocr_pending),
'ocr_pending': len(ocr_pending)
}
# Get previous progress from payload (injected by retry mechanism)
previous_progress = payload.get('previous_progress', {'no_ocr_completed': 0, 'ocr_completed': 0})
progress_made = (current_progress['no_ocr_completed'] > previous_progress['no_ocr_completed'] or
current_progress['ocr_completed'] > previous_progress['ocr_completed'])
if len(no_ocr_arts) < expected_parts or len(ocr_arts) < expected_parts:
if len(no_ocr_pending) > 0 or len(ocr_pending) > 0:
# Still processing - this is expected, always retry
error_msg = f"PROGRESS_RETRY: expected={expected_parts}, no_ocr={len(no_ocr_arts)}/{expected_parts} ({no_ocr_completion:.1f}%), ocr={len(ocr_arts)}/{expected_parts} ({ocr_completion:.1f}%), pending: no_ocr={len(no_ocr_pending)}, ocr={len(ocr_pending)}"
progress_retry_error = ValueError(error_msg)
progress_retry_error.current_progress = current_progress
progress_retry_error.is_progress_retry = True
raise progress_retry_error
elif progress_made:
# No pending but made progress since last check - likely brief gap between completions
error_msg = f"PROGRESS_MADE_RETRY: expected={expected_parts}, no_ocr={len(no_ocr_arts)}, ocr={len(ocr_arts)}, progress since last check"
progress_retry_error = ValueError(error_msg)
progress_retry_error.current_progress = current_progress
progress_retry_error.is_progress_retry = True
raise progress_retry_error
else:
# No progress and no pending - likely stalled, but still retry with backoff
error_msg = f"STALLED_RETRY: expected={expected_parts}, no_ocr={len(no_ocr_arts)}, ocr={len(ocr_arts)} - no pending tasks but will retry"
stalled_retry_error = ValueError(error_msg)
stalled_retry_error.current_progress = current_progress
stalled_retry_error.is_stalled_retry = True
raise stalled_retry_error
# Also verify both groups have the same number of completed parts
if len(no_ocr_arts) != len(ocr_arts):
error_msg = f"ALIGNMENT_RETRY: no_ocr completed={len(no_ocr_arts)}, ocr completed={len(ocr_arts)} (expected {expected_parts} each) - waiting for alignment"
alignment_retry_error = ValueError(error_msg)
alignment_retry_error.current_progress = current_progress
alignment_retry_error.is_alignment_retry = True
raise alignment_retry_error
else:
# Fallback to original logic when split_total not available
if not no_ocr_arts or not ocr_arts:
# More detailed retry logic with pending artefact awareness
if len(no_ocr_arts) == 0 and len(ocr_arts) == 0:
if len(no_ocr_pending) > 0 or len(ocr_pending) > 0:
raise ValueError(f"Batches still processing: no_ocr completed={len(no_ocr_arts)} pending={len(no_ocr_pending)}, ocr completed={len(ocr_arts)} pending={len(ocr_pending)} - will retry")
else:
raise ValueError(f"No artefacts found for either group: no_ocr={len(no_ocr_arts)}, ocr={len(ocr_arts)} - may need more time")
elif len(ocr_arts) == 0:
if len(ocr_pending) > 0:
raise ValueError(f"OCR batch still processing: no_ocr completed={len(no_ocr_arts)}, ocr completed={len(ocr_arts)} pending={len(ocr_pending)} - will retry")
else:
raise ValueError(f"OCR batch appears incomplete: no_ocr={len(no_ocr_arts)}, ocr={len(ocr_arts)} - will retry")
elif len(no_ocr_arts) == 0:
if len(no_ocr_pending) > 0:
raise ValueError(f"No-OCR batch still processing: no_ocr completed={len(no_ocr_arts)} pending={len(no_ocr_pending)}, ocr completed={len(ocr_arts)} - will retry")
else:
raise ValueError(f"No-OCR batch appears incomplete: no_ocr={len(no_ocr_arts)}, ocr={len(ocr_arts)} - will retry")
else:
raise ValueError(f"Unexpected missing artefacts: no_ocr={len(no_ocr_arts)}, ocr={len(ocr_arts)}")
# For fallback case, ensure both groups have same count
if len(no_ocr_arts) != len(ocr_arts):
raise ValueError(f"Mismatched group sizes: no_ocr={len(no_ocr_arts)}, ocr={len(ocr_arts)} - will retry")
# Sort both groups by split_order for aligned comparison
no_ocr_arts.sort(key=lambda x: ((x.get('extra') or {}).get('split_order') or 0))
ocr_arts.sort(key=lambda x: ((x.get('extra') or {}).get('split_order') or 0))
# Log final validation before proceeding
no_ocr_orders = [((a.get('extra') or {}).get('split_order') or 0) for a in no_ocr_arts]
ocr_orders = [((a.get('extra') or {}).get('split_order') or 0) for a in ocr_arts]
logger.info(f"Proceeding with comparison: no_ocr split_orders={no_ocr_orders}, ocr split_orders={ocr_orders}, expected_parts={expected_parts}")
# Create comparison results
comparison_results = self._compare_docling_groups(
file_id, bucket, cabinet_id, no_ocr_arts, ocr_arts, comparison_type,
no_ocr_group_id, ocr_group_id, payload
)
return comparison_results
except Exception as e:
logger.error(f"Comparison analysis failed for file {file_id}: {e}")
raise
def _compare_docling_groups(self, file_id: str, bucket: str, cabinet_id: str,
no_ocr_arts: list, ocr_arts: list, comparison_type: str,
no_ocr_group_id: str, ocr_group_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Compare two groups of docling artefacts and generate analysis."""
import subprocess
import tempfile
import json
import uuid
logger.info(f"Starting detailed comparison for file {file_id}: {len(no_ocr_arts)} vs {len(ocr_arts)} artefacts")
artefact_id = str(uuid.uuid4())
comparison_dir = f"{cabinet_id}/{file_id}/{artefact_id}"
results = []
overall_stats = {
'total_comparisons': min(len(no_ocr_arts), len(ocr_arts)),
'successful_comparisons': 0,
'failed_comparisons': 0,
'differences_found': 0,
'identical_count': 0
}
try:
with tempfile.TemporaryDirectory() as temp_dir:
for i in range(min(len(no_ocr_arts), len(ocr_arts))):
no_ocr_art = no_ocr_arts[i]
ocr_art = ocr_arts[i]
try:
# Download manifest JSONs for both artefacts
no_ocr_manifest_path = ((no_ocr_art.get('extra') or {}).get('manifest'))
ocr_manifest_path = ((ocr_art.get('extra') or {}).get('manifest'))
if not no_ocr_manifest_path or not ocr_manifest_path:
logger.warning(f"Missing manifest paths for comparison {i+1}")
continue
no_ocr_manifest_data = self.storage.download_file(bucket, no_ocr_manifest_path)
ocr_manifest_data = self.storage.download_file(bucket, ocr_manifest_path)
no_ocr_manifest = json.loads(no_ocr_manifest_data.decode('utf-8'))
ocr_manifest = json.loads(ocr_manifest_data.decode('utf-8'))
# Compare JSON content if available
no_ocr_json_path = no_ocr_manifest.get('json_full')
ocr_json_path = ocr_manifest.get('json_full')
if no_ocr_json_path and ocr_json_path:
comparison_result = self._compare_json_content(
bucket, no_ocr_json_path, ocr_json_path, temp_dir, i + 1
)
comparison_result['no_ocr_artefact_id'] = no_ocr_art['id']
comparison_result['ocr_artefact_id'] = ocr_art['id']
comparison_result['split_order'] = (no_ocr_art.get('extra') or {}).get('split_order', i + 1)
comparison_result['split_heading'] = (no_ocr_art.get('extra') or {}).get('split_heading', f'Part {i+1}')
results.append(comparison_result)
overall_stats['successful_comparisons'] += 1
if comparison_result['has_differences']:
overall_stats['differences_found'] += 1
else:
overall_stats['identical_count'] += 1
else:
logger.warning(f"Missing JSON content paths for comparison {i+1}")
overall_stats['failed_comparisons'] += 1
except Exception as part_e:
logger.warning(f"Failed to compare part {i+1}: {part_e}")
overall_stats['failed_comparisons'] += 1
continue
# Create comprehensive comparison report
comparison_report = {
'file_id': file_id,
'comparison_type': comparison_type,
'timestamp': json.dumps({"created_at": "now()"}, default=str),
'overall_statistics': overall_stats,
'detailed_results': results,
'summary': {
'total_parts_compared': overall_stats['successful_comparisons'],
'identical_parts': overall_stats['identical_count'],
'different_parts': overall_stats['differences_found'],
'accuracy_percentage': (overall_stats['identical_count'] / max(overall_stats['successful_comparisons'], 1)) * 100
}
}
# Store comparison report as artefact
report_path = f"{comparison_dir}/comparison_report.json"
report_json = json.dumps(comparison_report, ensure_ascii=False, indent=2)
self.storage.upload_file(bucket, report_path, report_json.encode('utf-8'), 'application/json', upsert=True)
# Create artefact record
client = SupabaseServiceRoleClient()
client.supabase.table('document_artefacts').insert({
'id': artefact_id,
'file_id': file_id,
'type': 'docling_comparison_analysis',
'rel_path': report_path,
'extra': {
'comparison_type': comparison_type,
'no_ocr_group_id': no_ocr_group_id,
'ocr_group_id': ocr_group_id,
'producer': payload.get('producer', 'auto_split'),
'total_comparisons': overall_stats['total_comparisons'],
'successful_comparisons': overall_stats['successful_comparisons'],
'differences_found': overall_stats['differences_found'],
'accuracy_percentage': comparison_report['summary']['accuracy_percentage']
},
'status': 'completed'
}).execute()
logger.info(f"Comparison analysis completed for file {file_id}: {overall_stats['successful_comparisons']} comparisons, {overall_stats['differences_found']} differences found")
# Trigger VLM processing after comparison completes (if enabled)
self._trigger_vlm_after_comparison(file_id, payload)
return {
'artefact_id': artefact_id,
'comparisons_completed': overall_stats['successful_comparisons'],
'differences_found': overall_stats['differences_found'],
'accuracy_percentage': comparison_report['summary']['accuracy_percentage']
}
except Exception as e:
logger.error(f"Failed to create comparison analysis for file {file_id}: {e}")
raise
def _compare_json_content(self, bucket: str, no_ocr_path: str, ocr_path: str,
temp_dir: str, part_number: int) -> Dict[str, Any]:
"""Compare JSON content using jq and diff as suggested in web search results."""
import subprocess
import os
from pathlib import Path
try:
# Download both JSON files
no_ocr_data = self.storage.download_file(bucket, no_ocr_path)
ocr_data = self.storage.download_file(bucket, ocr_path)
# Save to temp files
no_ocr_file = Path(temp_dir) / f'no_ocr_part_{part_number}.json'
ocr_file = Path(temp_dir) / f'ocr_part_{part_number}.json'
with open(no_ocr_file, 'wb') as f:
f.write(no_ocr_data)
with open(ocr_file, 'wb') as f:
f.write(ocr_data)
# Use jq to sort and format both files for comparison (as suggested in web search results)
sorted_no_ocr = Path(temp_dir) / f'sorted_no_ocr_part_{part_number}.json'
sorted_ocr = Path(temp_dir) / f'sorted_ocr_part_{part_number}.json'
# Sort both files using jq
subprocess.run(['jq', '--sort-keys', '.', str(no_ocr_file)],
stdout=open(sorted_no_ocr, 'w'), stderr=subprocess.DEVNULL, check=True)
subprocess.run(['jq', '--sort-keys', '.', str(ocr_file)],
stdout=open(sorted_ocr, 'w'), stderr=subprocess.DEVNULL, check=True)
# Compare using diff
diff_output = Path(temp_dir) / f'diff_part_{part_number}.txt'
diff_result = subprocess.run(
['diff', '-u', str(sorted_no_ocr), str(sorted_ocr)],
stdout=open(diff_output, 'w'),
stderr=subprocess.DEVNULL,
text=True
)
# Read diff output
with open(diff_output, 'r') as f:
diff_content = f.read()
# Analyze differences
has_differences = diff_result.returncode != 0
diff_lines = len([l for l in diff_content.split('\n') if l.startswith(('+', '-')) and not l.startswith(('+++', '---'))])
return {
'part_number': part_number,
'has_differences': has_differences,
'diff_lines_count': diff_lines,
'diff_content_preview': diff_content[:1000] if diff_content else '', # First 1000 chars
'no_ocr_size': len(no_ocr_data),
'ocr_size': len(ocr_data),
'size_difference': abs(len(ocr_data) - len(no_ocr_data))
}
except subprocess.CalledProcessError as e:
logger.warning(f"jq/diff command failed for part {part_number}: {e}")
return {
'part_number': part_number,
'has_differences': True,
'error': f"Comparison tools failed: {str(e)}",
'diff_lines_count': -1
}
except Exception as e:
logger.warning(f"JSON comparison failed for part {part_number}: {e}")
return {
'part_number': part_number,
'has_differences': True,
'error': f"Comparison failed: {str(e)}",
'diff_lines_count': -1
}
def process_vlm_section_page_bundle_task(self, task: QueueTask) -> Dict[str, Any]:
"""Process VLM section page bundle task - create individual page bundles and combine them."""
file_id = task.file_id
payload = task.payload
logger.info(f"Processing VLM section page bundle task for file {file_id}")
try:
section_idx = payload.get('section_idx')
start_page = payload.get('start_page')
end_page = payload.get('end_page')
section_title = payload.get('section_title', f'Section {section_idx}')
vlm_group_id = payload.get('vlm_group_id')
vlm_model = payload.get('vlm_model', 'smoldocling')
base_config = payload.get('base_config', {})
total_sections = payload.get('total_sections', 1)
client = SupabaseServiceRoleClient()
# Get file info
fr = client.supabase.table('files').select('*').eq('id', file_id).single().execute()
if not fr.data:
raise ValueError(f"File {file_id} not found")
file_row = fr.data
bucket = file_row['bucket']
cabinet_id = file_row['cabinet_id']
# Find processing path (prefer converted PDF)
arts = client.supabase.table('document_artefacts').select('type,rel_path').eq('file_id', file_id).order('created_at', desc=True).execute().data or []
pdf_art = next((a for a in arts if a.get('type') == 'document_pdf'), None)
processing_path = pdf_art['rel_path'] if pdf_art else file_row['path']
processing_mime = 'application/pdf'
logger.info(f"VLM section bundle: processing section {section_idx} '{section_title}' pages {start_page}-{end_page} for file {file_id}")
# Create individual page processing tasks
page_task_ids = []
for page_num in range(start_page, end_page + 1):
try:
page_config = {
**base_config,
'do_ocr': False,
'force_ocr': False,
'pipeline': 'vlm',
'vlm_pipeline_model': vlm_model,
'page_range': [page_num, page_num],
'target_type': 'zip',
'image_export_mode': 'referenced',
# Add required VLM parameters that may be missing
'do_picture_classification': False,
'do_picture_description': False
}
logger.debug(f"VLM page {page_num} config: pipeline={page_config.get('pipeline')}, model={page_config.get('vlm_pipeline_model')}, range={page_config.get('page_range')}")
from modules.queue_system import enqueue_docling_task, TaskPriority
page_task_id = enqueue_docling_task(
file_id=file_id,
task_type='canonical_docling_json',
payload={
'bucket': bucket,
'file_path': processing_path,
'cabinet_id': cabinet_id,
'mime_type': processing_mime,
'config': page_config,
'artefact_extra': {
'is_subdoc': True,
'page_range': [page_num, page_num],
'label': f'{section_title} - Page {page_num}',
'vlm_section_idx': section_idx,
'vlm_section_title': section_title,
'vlm_page_number': page_num,
'vlm_section_start': start_page,
'vlm_section_end': end_page,
'producer': 'auto_split_vlm_page'
}
},
priority=TaskPriority.NORMAL,
timeout=1800
)
page_task_ids.append((page_num, page_task_id))
logger.debug(f"Enqueued VLM page task {page_task_id} for page {page_num} of section {section_idx}")
except Exception as page_e:
logger.warning(f"Failed to enqueue VLM page {page_num} for section {section_idx} file {file_id}: {page_e}")
continue
if not page_task_ids:
raise ValueError(f"No page tasks could be enqueued for section {section_idx}")
# Wait for all page tasks to complete and then create section bundle
logger.info(f"Enqueued {len(page_task_ids)} VLM page tasks for section {section_idx}, now waiting for completion...")
# Create a follow-up task to bundle the completed page results
from modules.queue_system import enqueue_docling_task, TaskPriority
import time
# Wait a bit for page tasks to start, then create bundle task
time.sleep(10)
bundle_task_id = enqueue_docling_task(
file_id=file_id,
task_type='vlm_section_bundle_collector',
payload={
'section_idx': section_idx,
'start_page': start_page,
'end_page': end_page,
'section_title': section_title,
'vlm_group_id': vlm_group_id,
'vlm_model': vlm_model,
'total_sections': total_sections,
'producer': 'auto_split',
'page_task_ids': [tid for _, tid in page_task_ids],
'expected_pages': list(range(start_page, end_page + 1))
},
priority=TaskPriority.LOW, # Run after page tasks
timeout=3600
)
logger.info(f"Created VLM section bundle collector task {bundle_task_id} for section {section_idx}")
return {
'section_idx': section_idx,
'page_tasks_created': len(page_task_ids),
'bundle_task_id': bundle_task_id,
'pages_range': f"{start_page}-{end_page}"
}
except Exception as e:
logger.error(f"VLM section page bundle task failed for file {file_id}: {e}")
raise
def process_vlm_section_bundle_collector_task(self, task: QueueTask) -> Dict[str, Any]:
"""Collect completed VLM page results and create section-level bundle manifest."""
file_id = task.file_id
payload = task.payload
logger.info(f"Processing VLM section bundle collector for file {file_id}")
try:
section_idx = payload.get('section_idx')
start_page = payload.get('start_page')
end_page = payload.get('end_page')
section_title = payload.get('section_title', f'Section {section_idx}')
vlm_group_id = payload.get('vlm_group_id')
vlm_model = payload.get('vlm_model', 'smoldocling')
total_sections = payload.get('total_sections', 1)
expected_pages = payload.get('expected_pages', [])
client = SupabaseServiceRoleClient()
# Get file info
fr = client.supabase.table('files').select('*').eq('id', file_id).single().execute()
if not fr.data:
raise ValueError(f"File {file_id} not found")
file_row = fr.data
bucket = file_row['bucket']
cabinet_id = file_row['cabinet_id']
# Find all completed VLM page artefacts for this section
artefacts = client.supabase.table('document_artefacts').select('*').eq('file_id', file_id).execute()
arts = artefacts.data or []
# Filter for this section's VLM page artefacts
section_page_arts = []
for art in arts:
extra = art.get('extra', {})
if (extra.get('vlm_section_idx') == section_idx and
extra.get('producer') == 'auto_split_vlm_page' and
art.get('type') == 'docling_vlm' and
art.get('status') == 'completed'):
section_page_arts.append(art)
# Check if we have all expected pages
found_pages = [art.get('extra', {}).get('vlm_page_number') for art in section_page_arts]
found_pages = [p for p in found_pages if p is not None]
missing_pages = [p for p in expected_pages if p not in found_pages]
logger.info(f"VLM section {section_idx} bundle collector: found {len(section_page_arts)} page artefacts, expected {len(expected_pages)} pages")
if logger.isEnabledFor(10): # DEBUG level
found_pages_debug = [art.get('extra', {}).get('vlm_page_number') for art in section_page_arts]
logger.debug(f"VLM section {section_idx}: found pages {found_pages_debug}, expected pages {expected_pages}")
if missing_pages:
# Not all pages are ready, retry later
logger.info(f"VLM section {section_idx} bundle collector: missing pages {missing_pages}, found pages {found_pages} - will retry later")
raise ValueError(f"VLM section {section_idx} missing pages: {missing_pages} (found: {found_pages}) - will retry")
# Sort page artefacts by page number
section_page_arts.sort(key=lambda x: x.get('extra', {}).get('vlm_page_number', 0))
logger.info(f"VLM section {section_idx} bundle: creating manifest for {len(section_page_arts)} pages")
# Create section bundle manifest
section_artefact_id = str(uuid.uuid4())
section_manifest_path = f"{cabinet_id}/{file_id}/{section_artefact_id}/vlm_section_{section_idx}_manifest.json"
page_bundles = []
for page_art in section_page_arts:
extra = page_art.get('extra', {})
page_num = extra.get('vlm_page_number')
page_manifest_path = extra.get('manifest')
page_bundles.append({
'page_number': page_num,
'artefact_id': page_art['id'],
'manifest_path': page_manifest_path,
'rel_path': page_art['rel_path'],
'label': extra.get('label', f'Page {page_num}')
})
section_manifest = {
'file_id': file_id,
'section_idx': section_idx,
'section_title': section_title,
'start_page': start_page,
'end_page': end_page,
'vlm_model': vlm_model,
'total_pages': len(page_bundles),
'page_bundles': page_bundles,
'created_at': 'now()',
'type': 'vlm_section_page_bundle'
}
# Store section manifest
import json
manifest_json = json.dumps(section_manifest, ensure_ascii=False, indent=2)
self.storage.upload_file(bucket, section_manifest_path, manifest_json.encode('utf-8'), 'application/json', upsert=True)
# Create section bundle artefact
client.supabase.table('document_artefacts').insert({
'id': section_artefact_id,
'file_id': file_id,
'type': 'vlm_section_page_bundle',
'rel_path': section_manifest_path,
'extra': {
'section_idx': section_idx,
'section_title': section_title,
'start_page': start_page,
'end_page': end_page,
'vlm_model': vlm_model,
'total_pages': len(page_bundles),
'group_id': vlm_group_id,
'split_order': section_idx,
'split_heading': section_title,
'split_total': total_sections,
'pipeline': 'vlm',
'producer': 'auto_split',
'group_pack_type': 'vlm_page_bundle_auto_split'
},
'status': 'completed'
}).execute()
logger.info(f"VLM section bundle collector completed for section {section_idx} of file {file_id}: created manifest with {len(page_bundles)} page bundles")
return {
'section_artefact_id': section_artefact_id,
'section_idx': section_idx,
'pages_bundled': len(page_bundles),
'manifest_path': section_manifest_path
}
except Exception as e:
logger.error(f"VLM section bundle collector failed for file {file_id}: {e}")
raise
def _trigger_vlm_after_comparison(self, file_id: str, comparison_payload: Dict[str, Any]):
"""Trigger VLM processing after comparison analysis completes."""
try:
# Check if VLM should be triggered
if not comparison_payload.get('trigger_vlm_after_comparison'):
logger.debug(f"VLM post-comparison trigger not enabled for file {file_id}")
return
vlm_config = comparison_payload.get('vlm_config', {})
if not vlm_config.get('enabled'):
logger.debug(f"VLM not enabled for file {file_id}")
return
logger.info(f"[auto-canonical] Triggering VLM processing after comparison for file {file_id}")
# Extract VLM configuration
split_by_page = vlm_config.get('split_by_page', False)
vlm_model = vlm_config.get('model', 'smoldocling')
threshold = vlm_config.get('threshold', 50)
base_config = vlm_config.get('base_config', {})
# Generate new group_id for VLM processing
import uuid
vlm_group_id = str(uuid.uuid4())
if split_by_page:
# Page-by-page processing within sections
logger.info(f"[auto-canonical] vlm page-by-page processing for file {file_id} (post-comparison)")
self._enqueue_vlm_page_processing(
file_id, threshold, vlm_group_id, vlm_model, base_config
)
else:
# Standard section-level VLM processing
from routers.database.files.files import enqueue_canonical_docling
body_vlm = {
'use_split_map': True,
'threshold': threshold,
'producer': 'auto_split',
'group_id': vlm_group_id,
'config': {
**base_config,
'do_ocr': False, # VLM doesn't need OCR
'force_ocr': False,
'pipeline': 'vlm',
'vlm_pipeline_model': vlm_model
}
}
logger.info(f"[auto-canonical] vlm section batch group_id={vlm_group_id} for file {file_id} (post-comparison)")
enqueue_canonical_docling(file_id=file_id, body=body_vlm)
except Exception as e:
logger.warning(f"Failed to trigger VLM processing after comparison for file {file_id}: {e}")
def process_docling_bundle_task(self, task: QueueTask) -> Dict[str, Any]:
"""
Process single docling bundle task (whole document processing).
This creates a coherent single bundle with all formats using direct processing.
NO temporary tasks or old logic reuse - this is the new architecture.
"""
file_id = task.file_id
payload = task.payload
logger.info(f"🎯 NEW ARCHITECTURE: Processing docling bundle task for file {file_id} (whole document)")
try:
# Extract bundle configuration
config = payload.get('config', {})
bundle_metadata = payload.get('bundle_metadata', {})
# Ensure bundle processing configuration
config['target_type'] = 'zip'
config['to_formats'] = ['json', 'html', 'text', 'md', 'doctags']
# Call the actual docling processing directly - NO temp tasks!
result = self._process_docling_bundle_direct(task, config, bundle_metadata)
logger.info(f"✅ NEW ARCHITECTURE: Successfully processed docling bundle for file {file_id}")
return result
except Exception as e:
logger.error(f"❌ NEW ARCHITECTURE: Docling bundle processing failed for file {file_id}: {e}")
raise
def _process_docling_bundle_direct(self, task: QueueTask, config: Dict[str, Any], bundle_metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Direct docling bundle processing - NEW ARCHITECTURE approach.
This processes the docling request directly without creating temporary tasks,
ensuring clean Redis state and proper bundle metadata handling.
"""
file_id = task.file_id
payload = task.payload
logger.info(f"🔧 DIRECT PROCESSING: Starting docling bundle processing for file {file_id}")
if not self.docling_url:
raise ValueError("DOCLING_URL not configured")
# Extract payload data
bucket = payload['bucket']
file_path = payload['file_path']
cabinet_id = payload['cabinet_id']
# Download file
logger.debug(f"📥 DIRECT PROCESSING: Downloading file for bundle processing: {bucket}/{file_path}")
file_bytes = self.storage.download_file(bucket, file_path)
# Prepare Docling request with bundle-specific config
docling_api_key = os.getenv('DOCLING_API_KEY')
headers = {'Accept': '*/*'}
if docling_api_key:
headers['X-Api-Key'] = docling_api_key
# Build form data for bundle processing - USE CONFIG FROM PIPELINE_CONTROLLER (no hardcoded defaults!)
# The config passed from pipeline_controller already has environment variables loaded
form_data = [
('target_type', 'zip'), # Always zip for bundles
('do_ocr', str(config.get('do_ocr', False)).lower()),
('force_ocr', str(config.get('force_ocr', False)).lower()),
('image_export_mode', 'referenced'), # Bundle standard
('ocr_engine', config.get('ocr_engine', 'easyocr')),
('pdf_backend', config.get('pdf_backend', 'dlparse_v4')),
('table_mode', config.get('table_mode', 'fast')), # Use config from pipeline_controller (env vars)
('table_cell_matching', str(config.get('table_cell_matching', True)).lower()), # Use config from pipeline_controller (env: true)
('pipeline', config.get('pipeline', 'standard')),
('do_formula_enrichment', str(config.get('do_formula_enrichment', True)).lower()), # Use config from pipeline_controller (env: true)
('do_code_enrichment', str(config.get('do_code_enrichment', True)).lower()), # Use config from pipeline_controller (env: true)
('do_table_structure', str(config.get('do_table_structure', True)).lower()),
('include_images', str(config.get('include_images', True)).lower()),
('images_scale', str(config.get('images_scale', 2.0))),
('do_picture_classification', str(config.get('do_picture_classification', False)).lower()),
('do_picture_description', str(config.get('do_picture_description', False)).lower()),
('document_timeout', str(config.get('document_timeout', task.timeout)))
]
# Handle OCR languages as array (API expects multiple form fields)
ocr_lang = config.get('ocr_lang')
if ocr_lang:
if isinstance(ocr_lang, list):
for lang in ocr_lang:
form_data.append(('ocr_lang', str(lang)))
else:
form_data.append(('ocr_lang', str(ocr_lang)))
# Handle VLM pipeline options (CRITICAL for VLM processing)
if config.get('vlm_pipeline_model'):
form_data.append(('vlm_pipeline_model', config.get('vlm_pipeline_model')))
# VLM model local/API options must be JSON per Docling OpenAPI spec
if config.get('vlm_pipeline_model_local'):
vlm_local = config.get('vlm_pipeline_model_local')
if isinstance(vlm_local, (dict, list)):
form_data.append(('vlm_pipeline_model_local', json.dumps(vlm_local)))
elif isinstance(vlm_local, str) and vlm_local.strip().startswith(('{', '[')):
form_data.append(('vlm_pipeline_model_local', vlm_local))
# else: omit to avoid validation error
if config.get('vlm_pipeline_model_api'):
vlm_api = config.get('vlm_pipeline_model_api')
if isinstance(vlm_api, (dict, list)):
form_data.append(('vlm_pipeline_model_api', json.dumps(vlm_api)))
elif isinstance(vlm_api, str) and vlm_api.strip().startswith(('{', '[')):
form_data.append(('vlm_pipeline_model_api', vlm_api))
# else: omit
# Picture description options must be JSON per Docling OpenAPI spec
if config.get('picture_description_local'):
pic_local = config.get('picture_description_local')
if isinstance(pic_local, (dict, list)):
form_data.append(('picture_description_local', json.dumps(pic_local)))
elif isinstance(pic_local, str) and pic_local.strip().startswith(('{', '[')):
form_data.append(('picture_description_local', pic_local))
if config.get('picture_description_api'):
pic_api = config.get('picture_description_api')
if isinstance(pic_api, (dict, list)):
form_data.append(('picture_description_api', json.dumps(pic_api)))
elif isinstance(pic_api, str) and pic_api.strip().startswith(('{', '[')):
form_data.append(('picture_description_api', pic_api))
if 'picture_description_area_threshold' in config:
form_data.append(('picture_description_area_threshold', str(config.get('picture_description_area_threshold'))))
# Handle markdown page break placeholder
if 'md_page_break_placeholder' in config:
form_data.append(('md_page_break_placeholder', config.get('md_page_break_placeholder')))
# Add formats - always all formats for bundles
for fmt in ['json', 'html', 'text', 'md', 'doctags']:
form_data.append(('to_formats', fmt))
# Handle page range properly - get actual PDF page count like frontmatter does
page_range = config.get('page_range', [1, 999999])
if isinstance(page_range, list) and len(page_range) >= 2:
def _to_int_safe(v, default):
try:
return int(v)
except Exception:
return default
start_pg = _to_int_safe(page_range[0], 1)
end_pg = _to_int_safe(page_range[1], 999999)
if start_pg < 1:
start_pg = 1
if end_pg < start_pg:
end_pg = start_pg
# CRITICAL: Get actual PDF page count to prevent massive range
try:
import fitz # PyMuPDF
doc = fitz.open(stream=file_bytes, filetype='pdf')
pc = int(doc.page_count)
doc.close()
if pc > 0:
end_pg = min(end_pg, pc) # Clamp to actual page count!
start_pg = max(1, min(start_pg, pc))
if end_pg < start_pg:
end_pg = start_pg
logger.info(f"📄 DIRECT PROCESSING: PDF has {pc} pages, using range {start_pg}-{end_pg}")
except Exception as e:
logger.warning(f"Could not determine PDF page count: {e}, using defaults")
form_data.append(('page_range', str(start_pg)))
form_data.append(('page_range', str(end_pg)))
else:
# Fallback to single page if no range specified
form_data.append(('page_range', '1'))
form_data.append(('page_range', '1'))
files = [('files', ('file', file_bytes, payload.get('mime_type', 'application/pdf')))]
# DEBUG: Log the actual config being sent to Docling
config_debug = {key: value for key, value in form_data if key in ['table_mode', 'table_cell_matching', 'do_formula_enrichment', 'do_code_enrichment', 'do_ocr', 'pipeline']}
logger.info(f"🔧 DIRECT PROCESSING: Docling config being sent: {config_debug}")
# Make the HTTP request
logger.info(f"🌐 DIRECT PROCESSING: Making HTTP request to Docling for file {file_id}")
try:
import time
start_time = time.time()
response = requests.post(
f"{self.docling_url.rstrip('/')}/v1/convert/file",
files=files,
data=form_data,
headers=headers,
timeout=task.timeout
)
response.raise_for_status()
elapsed = time.time() - start_time
logger.info(f"⚡ DIRECT PROCESSING: Docling request completed in {elapsed:.2f}s for file {file_id}")
except Exception as e:
logger.error(f"🌐 DIRECT PROCESSING: HTTP request failed for file {file_id}: {e}")
raise
# Process response - should be ZIP for bundle
content_type = (response.headers.get('Content-Type') or '').lower()
is_zip_resp = ('zip' in content_type) or (response.content[:2] == b'PK')
if not is_zip_resp:
raise ValueError(f"Expected ZIP response for bundle, got: {content_type}")
# Process ZIP bundle and create artefacts
logger.info(f"📦 DIRECT PROCESSING: Processing ZIP bundle for file {file_id}")
result = self._process_docling_zip_bundle(
file_id=file_id,
bucket=bucket,
cabinet_id=cabinet_id,
zip_content=response.content,
bundle_metadata=bundle_metadata,
task_config=config
)
logger.info(f"✅ DIRECT PROCESSING: Bundle processing completed for file {file_id}")
return result
def _create_bundle_display_metadata(self, bundle_type: str, title: str, index: int = None,
total: int = None, page_range: list = None) -> dict:
"""
Create consistent display metadata for bundle organization.
This ensures all bundles have proper titles, ordering, and display names
for frontend organization and user-friendly presentation.
"""
metadata = {
'title': title,
'bundle_type': bundle_type
}
if index is not None:
metadata['split_order'] = index
if total is not None:
metadata['split_total'] = total
if page_range:
metadata['page_range'] = page_range
metadata['page_count'] = page_range[1] - page_range[0] + 1
# Create display names based on bundle type
if bundle_type == 'page':
metadata['display_name'] = f"Page {page_range[0]}" if page_range else f"Page {index}"
metadata['bundle_label'] = f"Page {page_range[0]} Bundle"
metadata['sort_key'] = page_range[0] if page_range else index
elif bundle_type == 'section':
page_str = f" (p{page_range[0]}-{page_range[1]})" if page_range else ""
metadata['display_name'] = f"{index:02d}. {title}{page_str}"
metadata['bundle_label'] = f"{title} Bundle"
metadata['sort_key'] = index
elif bundle_type == 'chunk':
page_str = f" (p{page_range[0]}-{page_range[1]})" if page_range else ""
metadata['display_name'] = f"{index:02d}. {title}{page_str}"
metadata['bundle_label'] = f"{title} Bundle"
metadata['sort_key'] = index
else:
metadata['display_name'] = title
metadata['bundle_label'] = f"{title} Bundle"
metadata['sort_key'] = index or 0
return metadata
def _process_docling_zip_bundle(self, file_id: str, bucket: str, cabinet_id: str,
zip_content: bytes, bundle_metadata: Dict[str, Any],
task_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Process ZIP bundle response and create artefacts with proper bundle metadata.
This is the NEW ARCHITECTURE approach for handling docling ZIP responses.
"""
import zipfile
import io
import uuid
import json
import time
logger.info(f"📦 ZIP PROCESSING: Starting bundle extraction for file {file_id}")
# Create bundle artefact structure
artefact_id = str(uuid.uuid4())
base_dir = f"{cabinet_id}/{file_id}/{artefact_id}"
archive_path = f"{base_dir}/bundle.zip"
# Save original archive
self.storage.upload_file(bucket, archive_path, zip_content, 'application/zip', upsert=True)
# Extract ZIP contents
zf = zipfile.ZipFile(io.BytesIO(zip_content))
entries = []
file_paths = {}
for entry in zf.filelist:
if entry.is_dir():
continue
entry_content = zf.read(entry)
entry_filename = entry.filename
rel_path = f"{base_dir}/{entry_filename}"
# Determine MIME type
if entry_filename.endswith('.json'):
mime = 'application/json'
file_paths['json'] = rel_path
elif entry_filename.endswith('.html'):
mime = 'text/html'
file_paths['html'] = rel_path
elif entry_filename.endswith('.md'):
mime = 'text/markdown'
file_paths['md'] = rel_path
elif entry_filename.endswith('.txt'):
mime = 'text/plain'
file_paths['text'] = rel_path
elif entry_filename.endswith('.doctags'):
mime = 'application/json'
file_paths['doctags'] = rel_path
else:
mime = 'application/octet-stream'
# Upload file
self.storage.upload_file(bucket, rel_path, entry_content, mime, upsert=True)
entries.append({
'filename': entry_filename,
'rel_path': rel_path,
'mime_type': mime,
'size': len(entry_content)
})
logger.debug(f"📄 ZIP PROCESSING: Extracted {entry_filename} -> {rel_path}")
zf.close()
# Create bundle manifest
manifest = {
'bundle_id': artefact_id,
'file_id': file_id,
'bundle_type': 'docling_bundle',
'processing_mode': 'whole_document',
'created_at': time.time(),
'archive_path': archive_path,
'entries': entries,
'file_paths': file_paths,
'metadata': bundle_metadata,
'config': task_config
}
manifest_path = f"{base_dir}/manifest.json"
manifest_content = json.dumps(manifest, indent=2).encode('utf-8')
self.storage.upload_file(bucket, manifest_path, manifest_content, 'application/json', upsert=True)
# Create database artefact with bundle metadata
artefact_extra = {
**bundle_metadata,
'manifest': manifest_path,
'archive_path': archive_path,
'file_paths': file_paths,
'entry_count': len(entries),
'group_pack_type': 'whole' # Add proper pack type for whole document bundles
}
self.client.supabase.table('document_artefacts').insert({
'id': artefact_id,
'file_id': file_id,
'page_number': 0, # Whole document
'type': 'docling_bundle',
'rel_path': base_dir,
'size_tag': json.dumps(task_config),
'language': 'en',
'chunk_index': None,
'extra': artefact_extra
}).execute()
logger.info(f"✅ ZIP PROCESSING: Created bundle artefact {artefact_id} with {len(entries)} files for file {file_id}")
return {
'artefact_id': artefact_id,
'rel_path': base_dir,
'manifest_path': manifest_path,
'archive_path': archive_path,
'file_paths': file_paths,
'entry_count': len(entries),
'bundle_metadata': bundle_metadata
}
def process_docling_bundle_split_task(self, task: QueueTask) -> Dict[str, Any]:
"""
Process split docling bundle task (multi-unit processing).
This creates multiple sub-bundles and a master manifest based on processing mode.
"""
file_id = task.file_id
payload = task.payload
logger.info(f"Processing docling bundle split task for file {file_id}")
try:
processing_mode = payload.get('processing_mode', 'split_by_sections')
processing_data = payload.get('processing_data', {})
config = payload.get('config', {})
bundle_metadata = payload.get('bundle_metadata', {})
logger.info(f"Split bundle processing mode: {processing_mode}")
if processing_mode == 'split_by_pages':
return self._process_split_by_pages(task, processing_data, config, bundle_metadata)
elif processing_mode == 'split_by_sections':
return self._process_split_by_sections(task, processing_data, config, bundle_metadata)
elif processing_mode == 'split_by_chunks':
return self._process_split_by_chunks(task, processing_data, config, bundle_metadata)
else:
raise ValueError(f"Unknown processing mode: {processing_mode}")
except Exception as e:
logger.error(f"Docling bundle split processing failed for file {file_id}: {e}")
raise
def _process_split_by_pages(self, task: QueueTask, processing_data: dict,
config: dict, bundle_metadata: dict) -> Dict[str, Any]:
"""Process document by individual pages and create page bundles."""
file_id = task.file_id
payload = task.payload
bucket = payload['bucket']
file_path = payload['file_path']
cabinet_id = payload['cabinet_id']
mime_type = payload['mime_type']
pages = processing_data.get('pages', [])
logger.info(f"Processing {len(pages)} individual pages for file {file_id}")
# Create master bundle directory
master_bundle_id = str(uuid.uuid4())
master_dir = f"{cabinet_id}/{file_id}/{master_bundle_id}"
page_bundles = []
# Process each page as a separate bundle
for idx, page_num in enumerate(pages, 1):
try:
page_config = {
**config,
'page_range': [page_num, page_num],
'target_type': 'zip',
'to_formats': ['json', 'html', 'text', 'md', 'doctags']
}
# Create descriptive page title and enhanced metadata
page_title = f"Page {page_num}"
page_display_name = f"Page {page_num}"
# Create individual page task with enhanced labeling
page_task = QueueTask(
id=f"{task.id}_page_{page_num}",
file_id=file_id,
service=task.service,
task_type='canonical_docling_json',
payload={
**payload,
'config': page_config,
'artefact_extra': {
'page_number': page_num,
'page_title': page_title,
'display_name': page_display_name,
'split_order': idx, # Sequential order within this bundle
'split_total': len(pages),
'split_heading': page_title,
'section_title': page_title, # For consistency
'is_page_bundle': True,
'master_bundle_id': master_bundle_id,
'bundle_label': f"Page {page_num} Bundle",
**bundle_metadata
}
},
priority=task.priority,
timeout=1800,
created_at=task.created_at
)
# Process page bundle
page_result = self._process_docling_task(page_task)
page_bundles.append({
'page_number': page_num,
'page_title': page_title,
'display_name': page_display_name,
'split_order': idx,
'artefact_id': page_result.get('artefact_id'),
'rel_path': page_result.get('rel_path')
})
except Exception as e:
logger.warning(f"Failed to process page {page_num} for file {file_id}: {e}")
continue
# Sort page bundles by page number for consistent ordering
page_bundles.sort(key=lambda x: x['page_number'])
# Create enhanced master manifest with proper organization metadata
master_manifest = {
'file_id': file_id,
'bundle_type': 'docling_bundle_split',
'split_mode': 'split_by_pages',
'total_pages': len(pages),
'successful_pages': len(page_bundles),
'page_bundles': page_bundles,
'created_at': 'now()',
'display_name': f"Document Pages ({len(page_bundles)} pages)",
'organization': {
'type': 'pages',
'sort_field': 'page_number',
'sort_order': 'asc',
'grouping': 'individual_pages'
},
**bundle_metadata
}
# Store master manifest
manifest_path = f"{master_dir}/master_manifest.json"
manifest_json = json.dumps(master_manifest, ensure_ascii=False, indent=2)
self.storage.upload_file(bucket, manifest_path, manifest_json.encode('utf-8'), 'application/json', upsert=True)
# Create master bundle artefact
self.client.supabase.table('document_artefacts').insert({
'id': master_bundle_id,
'file_id': file_id,
'type': 'docling_bundle_split_pages',
'rel_path': master_dir,
'extra': {
'manifest': manifest_path,
'split_mode': 'split_by_pages',
'total_pages': len(pages),
'successful_pages': len(page_bundles),
'group_pack_type': 'split_pages', # Add proper pack type for split page bundles
**bundle_metadata
},
'status': 'completed'
}).execute()
logger.info(f"Created page-based split bundle for file {file_id}: {len(page_bundles)} pages")
return {
'master_bundle_id': master_bundle_id,
'pages_processed': len(page_bundles),
'total_pages': len(pages)
}
def _process_split_by_sections(self, task: QueueTask, processing_data: dict,
config: dict, bundle_metadata: dict) -> Dict[str, Any]:
"""Process document by sections and create section bundles."""
file_id = task.file_id
payload = task.payload
bucket = payload['bucket']
cabinet_id = payload['cabinet_id']
entries = processing_data.get('entries', [])
logger.info(f"Processing {len(entries)} sections for file {file_id}")
# Create master bundle directory
master_bundle_id = str(uuid.uuid4())
master_dir = f"{cabinet_id}/{file_id}/{master_bundle_id}"
section_bundles = []
# Process each section as a separate bundle
logger.info(f"Processing {len(entries)} sections for file {file_id}")
for i, entry in enumerate(entries, 1):
try:
start_page = entry.get('start_page', 1)
end_page = entry.get('end_page', start_page)
# Enhanced section title handling with fallbacks and smart naming
raw_title = entry.get('title') or entry.get('label') or entry.get('heading')
section_title = raw_title.strip() if raw_title else f'Section {i}'
# Create enhanced display names for better organization
page_range_str = f"p{start_page}" if start_page == end_page else f"p{start_page}-{end_page}"
display_name = f"{i:02d}. {section_title}" if raw_title else f"{i:02d}. Section {i} ({page_range_str})"
bundle_label = f"{section_title} Bundle"
# Validate page ranges
if start_page < 1:
raise ValueError(f"Invalid start_page: {start_page} (must be >= 1)")
if end_page < start_page:
raise ValueError(f"Invalid page range: {start_page}-{end_page} (end < start)")
if start_page > 999 or end_page > 999:
raise ValueError(f"Suspicious page range: {start_page}-{end_page} (too high, possible corruption)")
logger.info(f"Processing section {i}/{len(entries)}: '{display_name}' (pages {start_page}-{end_page})")
section_config = {
**config,
'page_range': [start_page, end_page],
'target_type': 'zip',
'to_formats': ['json', 'html', 'text', 'md', 'doctags']
}
# Create section task with enhanced metadata and labeling
section_task = QueueTask(
id=f"{task.id}_section_{i}",
file_id=file_id,
service=task.service,
task_type='canonical_docling_json',
payload={
**payload,
'config': section_config,
'artefact_extra': {
'section_number': i,
'section_title': section_title,
'display_name': display_name,
'bundle_label': bundle_label,
'start_page': start_page,
'end_page': end_page,
'page_range': [start_page, end_page],
'page_count': end_page - start_page + 1,
'split_order': i, # Preserved ordering from split map
'split_total': len(entries),
'split_heading': section_title,
'is_section_bundle': True,
'master_bundle_id': master_bundle_id,
**bundle_metadata
}
},
priority=task.priority,
timeout=3600,
created_at=task.created_at
)
# Process section bundle
section_result = self._process_docling_task(section_task)
section_bundles.append({
'section_number': i,
'section_title': section_title,
'display_name': display_name,
'bundle_label': bundle_label,
'page_range': [start_page, end_page],
'page_count': end_page - start_page + 1,
'split_order': i,
'artefact_id': section_result.get('artefact_id'),
'rel_path': section_result.get('rel_path')
})
except Exception as e:
logger.error(f"FATAL: Failed to process section {i} for file {file_id}: {e}")
logger.error(f"Section details: title='{section_title}', pages={start_page}-{end_page}")
# Don't continue - fail the entire task if any section fails
raise Exception(f"Section processing failed for section {i} ('{section_title}', pages {start_page}-{end_page}): {e}")
# Sort section bundles by split_order for consistent ordering
section_bundles.sort(key=lambda x: x['split_order'])
# Create enhanced master manifest with proper organization metadata
master_manifest = {
'file_id': file_id,
'bundle_type': 'docling_bundle_split',
'split_mode': 'split_by_sections',
'total_sections': len(entries),
'successful_sections': len(section_bundles),
'section_bundles': section_bundles,
'created_at': 'now()',
'display_name': f"Document Sections ({len(section_bundles)} sections)",
'organization': {
'type': 'sections',
'sort_field': 'split_order',
'sort_order': 'asc',
'grouping': 'split_map_sections',
'has_titles': True,
'ordering_preserved': True
},
**bundle_metadata
}
# Store master manifest
manifest_path = f"{master_dir}/master_manifest.json"
manifest_json = json.dumps(master_manifest, ensure_ascii=False, indent=2)
self.storage.upload_file(bucket, manifest_path, manifest_json.encode('utf-8'), 'application/json', upsert=True)
# Create master bundle artefact
self.client.supabase.table('document_artefacts').insert({
'id': master_bundle_id,
'file_id': file_id,
'type': 'docling_bundle_split_sections',
'rel_path': master_dir,
'extra': {
'manifest': manifest_path,
'split_mode': 'split_by_sections',
'total_sections': len(entries),
'successful_sections': len(section_bundles),
'group_pack_type': 'split_sections', # Add proper pack type for split section bundles
**bundle_metadata
},
'status': 'completed'
}).execute()
logger.info(f"Created section-based split bundle for file {file_id}: {len(section_bundles)} sections")
return {
'master_bundle_id': master_bundle_id,
'sections_processed': len(section_bundles),
'total_sections': len(entries)
}
def _process_split_by_chunks(self, task: QueueTask, processing_data: dict,
config: dict, bundle_metadata: dict) -> Dict[str, Any]:
"""Process document by chunks and create chunk bundles."""
# Very similar to _process_split_by_sections but with chunk-specific labeling
file_id = task.file_id
payload = task.payload
bucket = payload['bucket']
cabinet_id = payload['cabinet_id']
chunks = processing_data.get('entries', [])
logger.info(f"Processing {len(chunks)} chunks for file {file_id}")
# Create master bundle directory
master_bundle_id = str(uuid.uuid4())
master_dir = f"{cabinet_id}/{file_id}/{master_bundle_id}"
chunk_bundles = []
# Process each chunk as a separate bundle
for i, chunk in enumerate(chunks, 1):
try:
start_page = chunk['start']
end_page = chunk['end']
# Enhanced chunk title handling
raw_title = chunk.get('title', f'Chunk {i}')
chunk_title = raw_title.strip() if raw_title else f'Chunk {i}'
# Create enhanced display names for chunks
page_range_str = f"p{start_page}" if start_page == end_page else f"p{start_page}-{end_page}"
display_name = f"{i:02d}. {chunk_title} ({page_range_str})"
bundle_label = f"{chunk_title} Bundle"
chunk_config = {
**config,
'page_range': [start_page, end_page],
'target_type': 'zip',
'to_formats': ['json', 'html', 'text', 'md', 'doctags']
}
# Create chunk task with enhanced labeling
chunk_task = QueueTask(
id=f"{task.id}_chunk_{i}",
file_id=file_id,
service=task.service,
task_type='canonical_docling_json',
payload={
**payload,
'config': chunk_config,
'artefact_extra': {
'chunk_number': i,
'chunk_title': chunk_title,
'display_name': display_name,
'bundle_label': bundle_label,
'start_page': start_page,
'end_page': end_page,
'page_range': [start_page, end_page],
'page_count': end_page - start_page + 1,
'split_order': i,
'split_total': len(chunks),
'split_heading': chunk_title,
'is_chunk_bundle': True,
'master_bundle_id': master_bundle_id,
**bundle_metadata
}
},
priority=task.priority,
timeout=3600,
created_at=task.created_at
)
# Process chunk bundle
chunk_result = self._process_docling_task(chunk_task)
chunk_bundles.append({
'chunk_number': i,
'chunk_title': chunk_title,
'display_name': display_name,
'bundle_label': bundle_label,
'page_range': [start_page, end_page],
'page_count': end_page - start_page + 1,
'split_order': i,
'artefact_id': chunk_result.get('artefact_id'),
'rel_path': chunk_result.get('rel_path')
})
except Exception as e:
logger.warning(f"Failed to process chunk {i} for file {file_id}: {e}")
continue
# Create master manifest
master_manifest = {
'file_id': file_id,
'bundle_type': 'docling_bundle_split',
'split_mode': 'split_by_chunks',
'total_chunks': len(chunks),
'successful_chunks': len(chunk_bundles),
'chunk_bundles': chunk_bundles,
'created_at': 'now()',
**bundle_metadata
}
# Store master manifest
manifest_path = f"{master_dir}/master_manifest.json"
manifest_json = json.dumps(master_manifest, ensure_ascii=False, indent=2)
self.storage.upload_file(bucket, manifest_path, manifest_json.encode('utf-8'), 'application/json', upsert=True)
# Create master bundle artefact
self.client.supabase.table('document_artefacts').insert({
'id': master_bundle_id,
'file_id': file_id,
'type': 'docling_bundle_split_chunks',
'rel_path': master_dir,
'extra': {
'manifest': manifest_path,
'split_mode': 'split_by_chunks',
'total_chunks': len(chunks),
'successful_chunks': len(chunk_bundles),
'group_pack_type': 'split_chunks', # Add proper pack type for split chunk bundles
**bundle_metadata
},
'status': 'completed'
}).execute()
logger.info(f"Created chunk-based split bundle for file {file_id}: {len(chunk_bundles)} chunks")
return {
'master_bundle_id': master_bundle_id,
'chunks_processed': len(chunk_bundles),
'total_chunks': len(chunks)
}
# process_phase2_coordinator_task method removed - pipelines now enqueued directly from split_map task
# _check_pipeline_group_completion method removed - task dependencies now handle sequential execution
# Global processor instance
_processor_instance = None
def get_processor() -> DocumentTaskProcessor:
"""Get the global task processor instance."""
global _processor_instance
if _processor_instance is None:
_processor_instance = DocumentTaskProcessor()
return _processor_instance