api/modules/bundle_metadata.py
2025-11-14 14:47:19 +00:00

572 lines
20 KiB
Python

"""
Uniform Bundle Metadata Architecture
This module defines standardized metadata structures for all document processing pipelines
to ensure consistency and interoperability across OCR, No-OCR, and VLM bundles.
Features:
- Uniform metadata schema for all pipeline types
- Consistent grouping and ordering mechanisms
- Pipeline-agnostic bundle identification
- Enhanced metadata for frontend display
- Backward compatibility with existing bundles
"""
import uuid
import hashlib
import json
from datetime import datetime
from typing import Dict, Any, Optional, List, Union, Literal
from dataclasses import dataclass, asdict
from enum import Enum
class PipelineType(Enum):
"""Supported pipeline types"""
STANDARD = "standard"
VLM = "vlm"
ASR = "asr"
class ProcessingMode(Enum):
"""How content was processed"""
WHOLE_DOCUMENT = "whole_document"
SPLIT_SECTIONS = "split_sections"
INDIVIDUAL_PAGES = "individual_pages"
PAGE_BUNDLE = "page_bundle"
class BundleType(Enum):
"""Type of bundle created"""
SINGLE_ARTEFACT = "single_artefact"
SPLIT_PACK = "split_pack"
PAGE_BUNDLE = "page_bundle"
VLM_SECTION_BUNDLE = "vlm_section_bundle"
# New unified bundle types
DOCLING_BUNDLE = "docling_bundle" # Single coherent processing unit
DOCLING_BUNDLE_SPLIT = "docling_bundle_split" # Container for multi-unit processing
@dataclass
class BundleMetadata:
"""
Standardized metadata structure for all document processing bundles.
This ensures consistency across OCR, No-OCR, and VLM pipelines while
maintaining backward compatibility.
"""
# Core identification
bundle_id: str
file_id: str
pipeline: PipelineType
processing_mode: ProcessingMode
bundle_type: BundleType
# Grouping and ordering
group_id: Optional[str] = None
split_order: Optional[int] = None
split_total: Optional[int] = None
split_heading: Optional[str] = None
# Content information
page_range: Optional[List[int]] = None # [start_page, end_page]
page_count: Optional[int] = None
section_title: Optional[str] = None
section_level: Optional[int] = None
# Processing details
config: Optional[Dict[str, Any]] = None
settings_fingerprint: Optional[str] = None
processing_time: Optional[float] = None
# Pipeline-specific metadata
pipeline_metadata: Optional[Dict[str, Any]] = None
# Producer information
producer: str = "manual" # manual, auto_phase2, auto_split, etc.
created_at: Optional[str] = None
# Status and quality
status: str = "completed"
quality_score: Optional[float] = None
def __post_init__(self):
"""Set defaults and compute derived fields"""
if self.created_at is None:
self.created_at = datetime.utcnow().isoformat()
if self.bundle_id is None:
self.bundle_id = str(uuid.uuid4())
# Compute settings fingerprint if config provided
if self.config and not self.settings_fingerprint:
self.settings_fingerprint = self._compute_settings_fingerprint(self.config)
def _compute_settings_fingerprint(self, config: Dict[str, Any]) -> str:
"""Compute a fingerprint for configuration settings (excluding page_range)"""
try:
config_for_hash = dict(config)
config_for_hash.pop('page_range', None) # Exclude page ranges from fingerprint
config_str = json.dumps(config_for_hash, sort_keys=True, ensure_ascii=False)
return hashlib.sha1(config_str.encode('utf-8')).hexdigest()[:16]
except Exception:
return str(uuid.uuid4())[:16]
def to_artefact_extra(self) -> Dict[str, Any]:
"""Convert to format suitable for document_artefacts.extra field"""
extra = {}
# Core fields
extra['bundle_metadata_version'] = '1.0'
extra['pipeline'] = self.pipeline.value
extra['processing_mode'] = self.processing_mode.value
extra['bundle_type'] = self.bundle_type.value
# Store original pipeline type for UI differentiation
if hasattr(self, 'original_pipeline_type'):
extra['original_pipeline'] = self.original_pipeline_type
# Grouping fields
if self.group_id:
extra['group_id'] = self.group_id
if self.split_order is not None:
extra['split_order'] = self.split_order
if self.split_total is not None:
extra['split_total'] = self.split_total
if self.split_heading:
extra['split_heading'] = self.split_heading
# Content fields
if self.page_range:
extra['page_range'] = self.page_range
if self.page_count is not None:
extra['page_count'] = self.page_count
if self.section_title:
extra['section_title'] = self.section_title
if self.section_level is not None:
extra['section_level'] = self.section_level
# Processing fields
if self.config:
extra['config'] = self.config
if self.settings_fingerprint:
extra['settings_fingerprint'] = self.settings_fingerprint
if self.processing_time is not None:
extra['processing_time'] = self.processing_time
# Pipeline-specific metadata
if self.pipeline_metadata:
extra['pipeline_metadata'] = self.pipeline_metadata
# Producer and quality
extra['producer'] = self.producer
if self.quality_score is not None:
extra['quality_score'] = self.quality_score
return extra
@classmethod
def from_artefact_extra(cls, file_id: str, artefact_id: str, extra: Dict[str, Any]) -> 'BundleMetadata':
"""Create BundleMetadata from document_artefacts.extra field"""
# Extract core fields with fallbacks for backward compatibility
pipeline_str = extra.get('pipeline', 'standard')
try:
pipeline = PipelineType(pipeline_str)
except ValueError:
pipeline = PipelineType.STANDARD
processing_mode_str = extra.get('processing_mode', 'whole_document')
try:
processing_mode = ProcessingMode(processing_mode_str)
except ValueError:
processing_mode = ProcessingMode.WHOLE_DOCUMENT
bundle_type_str = extra.get('bundle_type', 'single_artefact')
try:
bundle_type = BundleType(bundle_type_str)
except ValueError:
bundle_type = BundleType.SINGLE_ARTEFACT
return cls(
bundle_id=artefact_id,
file_id=file_id,
pipeline=pipeline,
processing_mode=processing_mode,
bundle_type=bundle_type,
group_id=extra.get('group_id'),
split_order=extra.get('split_order'),
split_total=extra.get('split_total'),
split_heading=extra.get('split_heading'),
page_range=extra.get('page_range'),
page_count=extra.get('page_count'),
section_title=extra.get('section_title'),
section_level=extra.get('section_level'),
config=extra.get('config'),
settings_fingerprint=extra.get('settings_fingerprint'),
processing_time=extra.get('processing_time'),
pipeline_metadata=extra.get('pipeline_metadata'),
producer=extra.get('producer', 'manual'),
created_at=extra.get('created_at'),
quality_score=extra.get('quality_score')
)
class BundleMetadataBuilder:
"""Helper class to build standardized bundle metadata"""
def __init__(self, file_id: str, pipeline: PipelineType):
self.file_id = file_id
self.pipeline = pipeline
self.metadata = BundleMetadata(
bundle_id=str(uuid.uuid4()),
file_id=file_id,
pipeline=pipeline,
processing_mode=ProcessingMode.WHOLE_DOCUMENT,
bundle_type=BundleType.SINGLE_ARTEFACT
)
def set_processing_mode(self, mode: ProcessingMode) -> 'BundleMetadataBuilder':
"""Set processing mode"""
self.metadata.processing_mode = mode
return self
def set_bundle_type(self, bundle_type: BundleType) -> 'BundleMetadataBuilder':
"""Set bundle type"""
self.metadata.bundle_type = bundle_type
return self
def set_group_info(self, group_id: str, split_order: int = None,
split_total: int = None, split_heading: str = None) -> 'BundleMetadataBuilder':
"""Set grouping information for split documents"""
self.metadata.group_id = group_id
self.metadata.split_order = split_order
self.metadata.split_total = split_total
self.metadata.split_heading = split_heading
return self
def set_page_info(self, page_range: List[int] = None,
page_count: int = None) -> 'BundleMetadataBuilder':
"""Set page information"""
self.metadata.page_range = page_range
self.metadata.page_count = page_count
return self
def set_section_info(self, title: str = None, level: int = None) -> 'BundleMetadataBuilder':
"""Set section information"""
self.metadata.section_title = title
self.metadata.section_level = level
return self
def set_config(self, config: Dict[str, Any]) -> 'BundleMetadataBuilder':
"""Set processing configuration"""
self.metadata.config = config
self.metadata.settings_fingerprint = self.metadata._compute_settings_fingerprint(config)
return self
def set_producer(self, producer: str) -> 'BundleMetadataBuilder':
"""Set producer information"""
self.metadata.producer = producer
return self
def set_pipeline_metadata(self, metadata: Dict[str, Any]) -> 'BundleMetadataBuilder':
"""Set pipeline-specific metadata"""
self.metadata.pipeline_metadata = metadata
return self
def set_quality_score(self, score: float) -> 'BundleMetadataBuilder':
"""Set quality score"""
self.metadata.quality_score = score
return self
def build(self) -> BundleMetadata:
"""Build the final metadata"""
return self.metadata
def create_standard_metadata(
file_id: str,
pipeline: Literal["ocr", "no_ocr", "vlm"] = "no_ocr",
processing_mode: Literal["whole_document", "split_sections", "individual_pages", "pages", "sections", "chunks"] = "split_sections",
config: Dict[str, Any] = None,
group_id: str = None,
split_order: int = None,
split_total: int = None,
split_heading: str = None,
page_range: List[int] = None,
producer: str = "auto_phase2"
) -> BundleMetadata:
"""
Convenience function to create standardized metadata for common use cases.
"""
# Map pipeline strings to enums
pipeline_map = {
"ocr": PipelineType.STANDARD,
"no_ocr": PipelineType.STANDARD,
"vlm": PipelineType.VLM
}
# Enhanced processing mode mapping with new bundle architecture
processing_mode_map = {
"whole_document": ProcessingMode.WHOLE_DOCUMENT,
"split_sections": ProcessingMode.SPLIT_SECTIONS,
"individual_pages": ProcessingMode.INDIVIDUAL_PAGES,
"split_by_pages": ProcessingMode.INDIVIDUAL_PAGES, # Split by pages processing
"split_by_sections": ProcessingMode.SPLIT_SECTIONS, # Split by sections processing
"split_by_chunks": ProcessingMode.SPLIT_SECTIONS, # Split by chunks processing
"pages": ProcessingMode.INDIVIDUAL_PAGES, # Alias for page-based processing
"sections": ProcessingMode.SPLIT_SECTIONS, # Alias for section-based processing
"chunks": ProcessingMode.SPLIT_SECTIONS, # Chunks treated as sections
}
# Determine bundle type based on processing mode and grouping
if processing_mode == "whole_document":
bundle_type = BundleType.DOCLING_BUNDLE
else:
bundle_type = BundleType.DOCLING_BUNDLE_SPLIT
builder = BundleMetadataBuilder(file_id, pipeline_map[pipeline])
builder.set_processing_mode(processing_mode_map[processing_mode])
builder.set_bundle_type(bundle_type)
builder.set_producer(producer)
# Store original pipeline type for UI differentiation
builder.metadata.original_pipeline_type = pipeline
if config:
# Add pipeline-specific config markers
enhanced_config = dict(config)
if pipeline == "ocr":
enhanced_config["do_ocr"] = True
elif pipeline == "no_ocr":
enhanced_config["do_ocr"] = False
elif pipeline == "vlm":
enhanced_config["pipeline"] = "vlm"
builder.set_config(enhanced_config)
if group_id:
builder.set_group_info(group_id, split_order, split_total, split_heading)
if page_range:
builder.set_page_info(page_range)
# Set section info if we have a heading
if split_heading:
builder.set_section_info(split_heading)
return builder.build()
def create_bundle_split_metadata(
file_id: str,
pipeline: Literal["ocr", "no_ocr", "vlm"] = "no_ocr",
split_mode: Literal["split_by_pages", "split_by_sections", "split_by_chunks"] = "split_by_sections",
config: Dict[str, Any] = None,
group_id: str = None,
producer: str = "auto_phase2",
processing_data: Dict[str, Any] = None
) -> BundleMetadata:
"""
Create metadata specifically for split bundle processing.
This is used for the new docling_bundle_split task type.
"""
# Map split modes to processing modes
mode_map = {
"split_by_pages": "pages",
"split_by_sections": "sections",
"split_by_chunks": "sections" # Chunks treated as sections
}
processing_mode = mode_map[split_mode]
metadata = create_standard_metadata(
file_id=file_id,
pipeline=pipeline,
processing_mode=processing_mode,
config=config,
group_id=group_id,
producer=producer
)
# Add split-specific metadata
if processing_data:
split_metadata = {
'split_mode': split_mode,
'processing_data': processing_data
}
if metadata.pipeline_metadata:
metadata.pipeline_metadata.update(split_metadata)
else:
metadata.pipeline_metadata = split_metadata
return metadata
def get_bundle_display_name(metadata: BundleMetadata) -> str:
"""Generate a user-friendly display name for a bundle"""
# Use explicit display name if available
if hasattr(metadata, 'display_name') and metadata.display_name:
return metadata.display_name
# Generate based on bundle type and processing mode
if metadata.bundle_type == BundleType.DOCLING_BUNDLE:
return "Complete Document"
elif metadata.bundle_type == BundleType.DOCLING_BUNDLE_SPLIT:
if metadata.processing_mode == ProcessingMode.INDIVIDUAL_PAGES:
if metadata.page_range:
return f"Page {metadata.page_range[0]}"
return "Page Bundle"
elif metadata.processing_mode == ProcessingMode.SPLIT_SECTIONS:
if metadata.section_title:
order_prefix = f"{metadata.split_order:02d}. " if metadata.split_order else ""
page_suffix = ""
if metadata.page_range and len(metadata.page_range) >= 2:
page_suffix = f" (p{metadata.page_range[0]}-{metadata.page_range[1]})"
return f"{order_prefix}{metadata.section_title}{page_suffix}"
return f"Section {metadata.split_order or 1}"
else:
return "Document Bundle"
# Fallback
return metadata.section_title or metadata.split_heading or f"Bundle {metadata.bundle_id[:8]}"
def create_organized_bundle_manifest(bundles: list, split_mode: str, pipeline: str) -> dict:
"""
Create an organized master manifest for split bundles with proper labeling and ordering.
Args:
bundles: List of individual bundle data
split_mode: The splitting mode used (pages, sections, chunks)
pipeline: The pipeline type (no_ocr, ocr, vlm)
Returns:
Enhanced manifest with organization metadata
"""
# Sort bundles by their ordering key
if split_mode == 'split_by_pages':
sorted_bundles = sorted(bundles, key=lambda x: x.get('page_number', 0))
display_name = f"{pipeline.upper()} Document Pages ({len(bundles)} pages)"
organization = {
'type': 'pages',
'sort_field': 'page_number',
'sort_order': 'asc',
'grouping': 'individual_pages'
}
elif split_mode == 'split_by_sections':
sorted_bundles = sorted(bundles, key=lambda x: x.get('split_order', 0))
display_name = f"{pipeline.upper()} Document Sections ({len(bundles)} sections)"
organization = {
'type': 'sections',
'sort_field': 'split_order',
'sort_order': 'asc',
'grouping': 'split_map_sections',
'has_titles': True,
'ordering_preserved': True
}
elif split_mode == 'split_by_chunks':
sorted_bundles = sorted(bundles, key=lambda x: x.get('split_order', 0))
display_name = f"{pipeline.upper()} Document Chunks ({len(bundles)} chunks)"
organization = {
'type': 'chunks',
'sort_field': 'split_order',
'sort_order': 'asc',
'grouping': 'fallback_chunks'
}
else:
sorted_bundles = bundles
display_name = f"{pipeline.upper()} Document Bundles"
organization = {
'type': 'unknown',
'sort_field': 'split_order',
'sort_order': 'asc'
}
return {
'bundles': sorted_bundles,
'display_name': display_name,
'organization': organization,
'total_bundles': len(bundles),
'pipeline': pipeline,
'split_mode': split_mode
}
# Pipeline display names
pipeline_names = {
PipelineType.STANDARD: "Standard",
PipelineType.VLM: "VLM",
PipelineType.ASR: "ASR"
}
pipeline_name = pipeline_names.get(metadata.pipeline, metadata.pipeline.value)
# OCR indication for standard pipeline
if metadata.pipeline == PipelineType.STANDARD and metadata.config:
ocr_enabled = metadata.config.get('do_ocr', False)
pipeline_name = f"{pipeline_name} ({'OCR' if ocr_enabled else 'No-OCR'})"
# Processing mode indication
if metadata.processing_mode == ProcessingMode.INDIVIDUAL_PAGES:
mode = "Page-by-page"
elif metadata.processing_mode == ProcessingMode.SPLIT_SECTIONS:
mode = "Sections"
else:
mode = "Whole doc"
# Section or page info
content_info = ""
if metadata.split_heading:
content_info = f" - {metadata.split_heading}"
elif metadata.page_range and len(metadata.page_range) == 2:
if metadata.page_range[0] == metadata.page_range[1]:
content_info = f" - Page {metadata.page_range[0]}"
else:
content_info = f" - Pages {metadata.page_range[0]}-{metadata.page_range[1]}"
# Producer info
producer_info = ""
if metadata.producer == "auto_phase2":
producer_info = " (Auto)"
elif metadata.producer.startswith("auto"):
producer_info = " (Auto)"
return f"{pipeline_name} {mode}{content_info}{producer_info}"
def group_bundles_by_metadata(bundles: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
"""
Group bundles by their metadata for display purposes.
Returns a dictionary mapping group keys to lists of bundles.
"""
groups = {}
ungrouped = []
for bundle in bundles:
extra = bundle.get('extra', {})
# Skip bundles without the new metadata format
if not extra.get('bundle_metadata_version'):
ungrouped.append(bundle)
continue
metadata = BundleMetadata.from_artefact_extra(
bundle['file_id'],
bundle['id'],
extra
)
if metadata.group_id:
group_key = f"group:{metadata.group_id}"
if group_key not in groups:
groups[group_key] = []
groups[group_key].append(bundle)
else:
ungrouped.append(bundle)
# Add ungrouped bundles as individual groups
for bundle in ungrouped:
single_key = f"single:{bundle['id']}"
groups[single_key] = [bundle]
return groups