572 lines
20 KiB
Python
572 lines
20 KiB
Python
"""
|
|
Uniform Bundle Metadata Architecture
|
|
|
|
This module defines standardized metadata structures for all document processing pipelines
|
|
to ensure consistency and interoperability across OCR, No-OCR, and VLM bundles.
|
|
|
|
Features:
|
|
- Uniform metadata schema for all pipeline types
|
|
- Consistent grouping and ordering mechanisms
|
|
- Pipeline-agnostic bundle identification
|
|
- Enhanced metadata for frontend display
|
|
- Backward compatibility with existing bundles
|
|
"""
|
|
|
|
import uuid
|
|
import hashlib
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List, Union, Literal
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
|
|
class PipelineType(Enum):
|
|
"""Supported pipeline types"""
|
|
STANDARD = "standard"
|
|
VLM = "vlm"
|
|
ASR = "asr"
|
|
|
|
class ProcessingMode(Enum):
|
|
"""How content was processed"""
|
|
WHOLE_DOCUMENT = "whole_document"
|
|
SPLIT_SECTIONS = "split_sections"
|
|
INDIVIDUAL_PAGES = "individual_pages"
|
|
PAGE_BUNDLE = "page_bundle"
|
|
|
|
class BundleType(Enum):
|
|
"""Type of bundle created"""
|
|
SINGLE_ARTEFACT = "single_artefact"
|
|
SPLIT_PACK = "split_pack"
|
|
PAGE_BUNDLE = "page_bundle"
|
|
VLM_SECTION_BUNDLE = "vlm_section_bundle"
|
|
# New unified bundle types
|
|
DOCLING_BUNDLE = "docling_bundle" # Single coherent processing unit
|
|
DOCLING_BUNDLE_SPLIT = "docling_bundle_split" # Container for multi-unit processing
|
|
|
|
@dataclass
|
|
class BundleMetadata:
|
|
"""
|
|
Standardized metadata structure for all document processing bundles.
|
|
|
|
This ensures consistency across OCR, No-OCR, and VLM pipelines while
|
|
maintaining backward compatibility.
|
|
"""
|
|
|
|
# Core identification
|
|
bundle_id: str
|
|
file_id: str
|
|
pipeline: PipelineType
|
|
processing_mode: ProcessingMode
|
|
bundle_type: BundleType
|
|
|
|
# Grouping and ordering
|
|
group_id: Optional[str] = None
|
|
split_order: Optional[int] = None
|
|
split_total: Optional[int] = None
|
|
split_heading: Optional[str] = None
|
|
|
|
# Content information
|
|
page_range: Optional[List[int]] = None # [start_page, end_page]
|
|
page_count: Optional[int] = None
|
|
section_title: Optional[str] = None
|
|
section_level: Optional[int] = None
|
|
|
|
# Processing details
|
|
config: Optional[Dict[str, Any]] = None
|
|
settings_fingerprint: Optional[str] = None
|
|
processing_time: Optional[float] = None
|
|
|
|
# Pipeline-specific metadata
|
|
pipeline_metadata: Optional[Dict[str, Any]] = None
|
|
|
|
# Producer information
|
|
producer: str = "manual" # manual, auto_phase2, auto_split, etc.
|
|
created_at: Optional[str] = None
|
|
|
|
# Status and quality
|
|
status: str = "completed"
|
|
quality_score: Optional[float] = None
|
|
|
|
def __post_init__(self):
|
|
"""Set defaults and compute derived fields"""
|
|
if self.created_at is None:
|
|
self.created_at = datetime.utcnow().isoformat()
|
|
|
|
if self.bundle_id is None:
|
|
self.bundle_id = str(uuid.uuid4())
|
|
|
|
# Compute settings fingerprint if config provided
|
|
if self.config and not self.settings_fingerprint:
|
|
self.settings_fingerprint = self._compute_settings_fingerprint(self.config)
|
|
|
|
def _compute_settings_fingerprint(self, config: Dict[str, Any]) -> str:
|
|
"""Compute a fingerprint for configuration settings (excluding page_range)"""
|
|
try:
|
|
config_for_hash = dict(config)
|
|
config_for_hash.pop('page_range', None) # Exclude page ranges from fingerprint
|
|
config_str = json.dumps(config_for_hash, sort_keys=True, ensure_ascii=False)
|
|
return hashlib.sha1(config_str.encode('utf-8')).hexdigest()[:16]
|
|
except Exception:
|
|
return str(uuid.uuid4())[:16]
|
|
|
|
def to_artefact_extra(self) -> Dict[str, Any]:
|
|
"""Convert to format suitable for document_artefacts.extra field"""
|
|
extra = {}
|
|
|
|
# Core fields
|
|
extra['bundle_metadata_version'] = '1.0'
|
|
extra['pipeline'] = self.pipeline.value
|
|
extra['processing_mode'] = self.processing_mode.value
|
|
extra['bundle_type'] = self.bundle_type.value
|
|
|
|
# Store original pipeline type for UI differentiation
|
|
if hasattr(self, 'original_pipeline_type'):
|
|
extra['original_pipeline'] = self.original_pipeline_type
|
|
|
|
# Grouping fields
|
|
if self.group_id:
|
|
extra['group_id'] = self.group_id
|
|
if self.split_order is not None:
|
|
extra['split_order'] = self.split_order
|
|
if self.split_total is not None:
|
|
extra['split_total'] = self.split_total
|
|
if self.split_heading:
|
|
extra['split_heading'] = self.split_heading
|
|
|
|
# Content fields
|
|
if self.page_range:
|
|
extra['page_range'] = self.page_range
|
|
if self.page_count is not None:
|
|
extra['page_count'] = self.page_count
|
|
if self.section_title:
|
|
extra['section_title'] = self.section_title
|
|
if self.section_level is not None:
|
|
extra['section_level'] = self.section_level
|
|
|
|
# Processing fields
|
|
if self.config:
|
|
extra['config'] = self.config
|
|
if self.settings_fingerprint:
|
|
extra['settings_fingerprint'] = self.settings_fingerprint
|
|
if self.processing_time is not None:
|
|
extra['processing_time'] = self.processing_time
|
|
|
|
# Pipeline-specific metadata
|
|
if self.pipeline_metadata:
|
|
extra['pipeline_metadata'] = self.pipeline_metadata
|
|
|
|
# Producer and quality
|
|
extra['producer'] = self.producer
|
|
if self.quality_score is not None:
|
|
extra['quality_score'] = self.quality_score
|
|
|
|
return extra
|
|
|
|
@classmethod
|
|
def from_artefact_extra(cls, file_id: str, artefact_id: str, extra: Dict[str, Any]) -> 'BundleMetadata':
|
|
"""Create BundleMetadata from document_artefacts.extra field"""
|
|
|
|
# Extract core fields with fallbacks for backward compatibility
|
|
pipeline_str = extra.get('pipeline', 'standard')
|
|
try:
|
|
pipeline = PipelineType(pipeline_str)
|
|
except ValueError:
|
|
pipeline = PipelineType.STANDARD
|
|
|
|
processing_mode_str = extra.get('processing_mode', 'whole_document')
|
|
try:
|
|
processing_mode = ProcessingMode(processing_mode_str)
|
|
except ValueError:
|
|
processing_mode = ProcessingMode.WHOLE_DOCUMENT
|
|
|
|
bundle_type_str = extra.get('bundle_type', 'single_artefact')
|
|
try:
|
|
bundle_type = BundleType(bundle_type_str)
|
|
except ValueError:
|
|
bundle_type = BundleType.SINGLE_ARTEFACT
|
|
|
|
return cls(
|
|
bundle_id=artefact_id,
|
|
file_id=file_id,
|
|
pipeline=pipeline,
|
|
processing_mode=processing_mode,
|
|
bundle_type=bundle_type,
|
|
group_id=extra.get('group_id'),
|
|
split_order=extra.get('split_order'),
|
|
split_total=extra.get('split_total'),
|
|
split_heading=extra.get('split_heading'),
|
|
page_range=extra.get('page_range'),
|
|
page_count=extra.get('page_count'),
|
|
section_title=extra.get('section_title'),
|
|
section_level=extra.get('section_level'),
|
|
config=extra.get('config'),
|
|
settings_fingerprint=extra.get('settings_fingerprint'),
|
|
processing_time=extra.get('processing_time'),
|
|
pipeline_metadata=extra.get('pipeline_metadata'),
|
|
producer=extra.get('producer', 'manual'),
|
|
created_at=extra.get('created_at'),
|
|
quality_score=extra.get('quality_score')
|
|
)
|
|
|
|
class BundleMetadataBuilder:
|
|
"""Helper class to build standardized bundle metadata"""
|
|
|
|
def __init__(self, file_id: str, pipeline: PipelineType):
|
|
self.file_id = file_id
|
|
self.pipeline = pipeline
|
|
self.metadata = BundleMetadata(
|
|
bundle_id=str(uuid.uuid4()),
|
|
file_id=file_id,
|
|
pipeline=pipeline,
|
|
processing_mode=ProcessingMode.WHOLE_DOCUMENT,
|
|
bundle_type=BundleType.SINGLE_ARTEFACT
|
|
)
|
|
|
|
def set_processing_mode(self, mode: ProcessingMode) -> 'BundleMetadataBuilder':
|
|
"""Set processing mode"""
|
|
self.metadata.processing_mode = mode
|
|
return self
|
|
|
|
def set_bundle_type(self, bundle_type: BundleType) -> 'BundleMetadataBuilder':
|
|
"""Set bundle type"""
|
|
self.metadata.bundle_type = bundle_type
|
|
return self
|
|
|
|
def set_group_info(self, group_id: str, split_order: int = None,
|
|
split_total: int = None, split_heading: str = None) -> 'BundleMetadataBuilder':
|
|
"""Set grouping information for split documents"""
|
|
self.metadata.group_id = group_id
|
|
self.metadata.split_order = split_order
|
|
self.metadata.split_total = split_total
|
|
self.metadata.split_heading = split_heading
|
|
return self
|
|
|
|
def set_page_info(self, page_range: List[int] = None,
|
|
page_count: int = None) -> 'BundleMetadataBuilder':
|
|
"""Set page information"""
|
|
self.metadata.page_range = page_range
|
|
self.metadata.page_count = page_count
|
|
return self
|
|
|
|
def set_section_info(self, title: str = None, level: int = None) -> 'BundleMetadataBuilder':
|
|
"""Set section information"""
|
|
self.metadata.section_title = title
|
|
self.metadata.section_level = level
|
|
return self
|
|
|
|
def set_config(self, config: Dict[str, Any]) -> 'BundleMetadataBuilder':
|
|
"""Set processing configuration"""
|
|
self.metadata.config = config
|
|
self.metadata.settings_fingerprint = self.metadata._compute_settings_fingerprint(config)
|
|
return self
|
|
|
|
def set_producer(self, producer: str) -> 'BundleMetadataBuilder':
|
|
"""Set producer information"""
|
|
self.metadata.producer = producer
|
|
return self
|
|
|
|
def set_pipeline_metadata(self, metadata: Dict[str, Any]) -> 'BundleMetadataBuilder':
|
|
"""Set pipeline-specific metadata"""
|
|
self.metadata.pipeline_metadata = metadata
|
|
return self
|
|
|
|
def set_quality_score(self, score: float) -> 'BundleMetadataBuilder':
|
|
"""Set quality score"""
|
|
self.metadata.quality_score = score
|
|
return self
|
|
|
|
def build(self) -> BundleMetadata:
|
|
"""Build the final metadata"""
|
|
return self.metadata
|
|
|
|
def create_standard_metadata(
|
|
file_id: str,
|
|
pipeline: Literal["ocr", "no_ocr", "vlm"] = "no_ocr",
|
|
processing_mode: Literal["whole_document", "split_sections", "individual_pages", "pages", "sections", "chunks"] = "split_sections",
|
|
config: Dict[str, Any] = None,
|
|
group_id: str = None,
|
|
split_order: int = None,
|
|
split_total: int = None,
|
|
split_heading: str = None,
|
|
page_range: List[int] = None,
|
|
producer: str = "auto_phase2"
|
|
) -> BundleMetadata:
|
|
"""
|
|
Convenience function to create standardized metadata for common use cases.
|
|
"""
|
|
|
|
# Map pipeline strings to enums
|
|
pipeline_map = {
|
|
"ocr": PipelineType.STANDARD,
|
|
"no_ocr": PipelineType.STANDARD,
|
|
"vlm": PipelineType.VLM
|
|
}
|
|
|
|
# Enhanced processing mode mapping with new bundle architecture
|
|
processing_mode_map = {
|
|
"whole_document": ProcessingMode.WHOLE_DOCUMENT,
|
|
"split_sections": ProcessingMode.SPLIT_SECTIONS,
|
|
"individual_pages": ProcessingMode.INDIVIDUAL_PAGES,
|
|
"split_by_pages": ProcessingMode.INDIVIDUAL_PAGES, # Split by pages processing
|
|
"split_by_sections": ProcessingMode.SPLIT_SECTIONS, # Split by sections processing
|
|
"split_by_chunks": ProcessingMode.SPLIT_SECTIONS, # Split by chunks processing
|
|
"pages": ProcessingMode.INDIVIDUAL_PAGES, # Alias for page-based processing
|
|
"sections": ProcessingMode.SPLIT_SECTIONS, # Alias for section-based processing
|
|
"chunks": ProcessingMode.SPLIT_SECTIONS, # Chunks treated as sections
|
|
}
|
|
|
|
# Determine bundle type based on processing mode and grouping
|
|
if processing_mode == "whole_document":
|
|
bundle_type = BundleType.DOCLING_BUNDLE
|
|
else:
|
|
bundle_type = BundleType.DOCLING_BUNDLE_SPLIT
|
|
|
|
builder = BundleMetadataBuilder(file_id, pipeline_map[pipeline])
|
|
builder.set_processing_mode(processing_mode_map[processing_mode])
|
|
builder.set_bundle_type(bundle_type)
|
|
builder.set_producer(producer)
|
|
|
|
# Store original pipeline type for UI differentiation
|
|
builder.metadata.original_pipeline_type = pipeline
|
|
|
|
if config:
|
|
# Add pipeline-specific config markers
|
|
enhanced_config = dict(config)
|
|
if pipeline == "ocr":
|
|
enhanced_config["do_ocr"] = True
|
|
elif pipeline == "no_ocr":
|
|
enhanced_config["do_ocr"] = False
|
|
elif pipeline == "vlm":
|
|
enhanced_config["pipeline"] = "vlm"
|
|
|
|
builder.set_config(enhanced_config)
|
|
|
|
if group_id:
|
|
builder.set_group_info(group_id, split_order, split_total, split_heading)
|
|
|
|
if page_range:
|
|
builder.set_page_info(page_range)
|
|
|
|
# Set section info if we have a heading
|
|
if split_heading:
|
|
builder.set_section_info(split_heading)
|
|
|
|
return builder.build()
|
|
|
|
def create_bundle_split_metadata(
|
|
file_id: str,
|
|
pipeline: Literal["ocr", "no_ocr", "vlm"] = "no_ocr",
|
|
split_mode: Literal["split_by_pages", "split_by_sections", "split_by_chunks"] = "split_by_sections",
|
|
config: Dict[str, Any] = None,
|
|
group_id: str = None,
|
|
producer: str = "auto_phase2",
|
|
processing_data: Dict[str, Any] = None
|
|
) -> BundleMetadata:
|
|
"""
|
|
Create metadata specifically for split bundle processing.
|
|
|
|
This is used for the new docling_bundle_split task type.
|
|
"""
|
|
|
|
# Map split modes to processing modes
|
|
mode_map = {
|
|
"split_by_pages": "pages",
|
|
"split_by_sections": "sections",
|
|
"split_by_chunks": "sections" # Chunks treated as sections
|
|
}
|
|
|
|
processing_mode = mode_map[split_mode]
|
|
|
|
metadata = create_standard_metadata(
|
|
file_id=file_id,
|
|
pipeline=pipeline,
|
|
processing_mode=processing_mode,
|
|
config=config,
|
|
group_id=group_id,
|
|
producer=producer
|
|
)
|
|
|
|
# Add split-specific metadata
|
|
if processing_data:
|
|
split_metadata = {
|
|
'split_mode': split_mode,
|
|
'processing_data': processing_data
|
|
}
|
|
if metadata.pipeline_metadata:
|
|
metadata.pipeline_metadata.update(split_metadata)
|
|
else:
|
|
metadata.pipeline_metadata = split_metadata
|
|
|
|
return metadata
|
|
|
|
def get_bundle_display_name(metadata: BundleMetadata) -> str:
|
|
"""Generate a user-friendly display name for a bundle"""
|
|
|
|
# Use explicit display name if available
|
|
if hasattr(metadata, 'display_name') and metadata.display_name:
|
|
return metadata.display_name
|
|
|
|
# Generate based on bundle type and processing mode
|
|
if metadata.bundle_type == BundleType.DOCLING_BUNDLE:
|
|
return "Complete Document"
|
|
|
|
elif metadata.bundle_type == BundleType.DOCLING_BUNDLE_SPLIT:
|
|
if metadata.processing_mode == ProcessingMode.INDIVIDUAL_PAGES:
|
|
if metadata.page_range:
|
|
return f"Page {metadata.page_range[0]}"
|
|
return "Page Bundle"
|
|
elif metadata.processing_mode == ProcessingMode.SPLIT_SECTIONS:
|
|
if metadata.section_title:
|
|
order_prefix = f"{metadata.split_order:02d}. " if metadata.split_order else ""
|
|
page_suffix = ""
|
|
if metadata.page_range and len(metadata.page_range) >= 2:
|
|
page_suffix = f" (p{metadata.page_range[0]}-{metadata.page_range[1]})"
|
|
return f"{order_prefix}{metadata.section_title}{page_suffix}"
|
|
return f"Section {metadata.split_order or 1}"
|
|
else:
|
|
return "Document Bundle"
|
|
|
|
# Fallback
|
|
return metadata.section_title or metadata.split_heading or f"Bundle {metadata.bundle_id[:8]}"
|
|
|
|
def create_organized_bundle_manifest(bundles: list, split_mode: str, pipeline: str) -> dict:
|
|
"""
|
|
Create an organized master manifest for split bundles with proper labeling and ordering.
|
|
|
|
Args:
|
|
bundles: List of individual bundle data
|
|
split_mode: The splitting mode used (pages, sections, chunks)
|
|
pipeline: The pipeline type (no_ocr, ocr, vlm)
|
|
|
|
Returns:
|
|
Enhanced manifest with organization metadata
|
|
"""
|
|
|
|
# Sort bundles by their ordering key
|
|
if split_mode == 'split_by_pages':
|
|
sorted_bundles = sorted(bundles, key=lambda x: x.get('page_number', 0))
|
|
display_name = f"{pipeline.upper()} Document Pages ({len(bundles)} pages)"
|
|
organization = {
|
|
'type': 'pages',
|
|
'sort_field': 'page_number',
|
|
'sort_order': 'asc',
|
|
'grouping': 'individual_pages'
|
|
}
|
|
elif split_mode == 'split_by_sections':
|
|
sorted_bundles = sorted(bundles, key=lambda x: x.get('split_order', 0))
|
|
display_name = f"{pipeline.upper()} Document Sections ({len(bundles)} sections)"
|
|
organization = {
|
|
'type': 'sections',
|
|
'sort_field': 'split_order',
|
|
'sort_order': 'asc',
|
|
'grouping': 'split_map_sections',
|
|
'has_titles': True,
|
|
'ordering_preserved': True
|
|
}
|
|
elif split_mode == 'split_by_chunks':
|
|
sorted_bundles = sorted(bundles, key=lambda x: x.get('split_order', 0))
|
|
display_name = f"{pipeline.upper()} Document Chunks ({len(bundles)} chunks)"
|
|
organization = {
|
|
'type': 'chunks',
|
|
'sort_field': 'split_order',
|
|
'sort_order': 'asc',
|
|
'grouping': 'fallback_chunks'
|
|
}
|
|
else:
|
|
sorted_bundles = bundles
|
|
display_name = f"{pipeline.upper()} Document Bundles"
|
|
organization = {
|
|
'type': 'unknown',
|
|
'sort_field': 'split_order',
|
|
'sort_order': 'asc'
|
|
}
|
|
|
|
return {
|
|
'bundles': sorted_bundles,
|
|
'display_name': display_name,
|
|
'organization': organization,
|
|
'total_bundles': len(bundles),
|
|
'pipeline': pipeline,
|
|
'split_mode': split_mode
|
|
}
|
|
|
|
# Pipeline display names
|
|
pipeline_names = {
|
|
PipelineType.STANDARD: "Standard",
|
|
PipelineType.VLM: "VLM",
|
|
PipelineType.ASR: "ASR"
|
|
}
|
|
|
|
pipeline_name = pipeline_names.get(metadata.pipeline, metadata.pipeline.value)
|
|
|
|
# OCR indication for standard pipeline
|
|
if metadata.pipeline == PipelineType.STANDARD and metadata.config:
|
|
ocr_enabled = metadata.config.get('do_ocr', False)
|
|
pipeline_name = f"{pipeline_name} ({'OCR' if ocr_enabled else 'No-OCR'})"
|
|
|
|
# Processing mode indication
|
|
if metadata.processing_mode == ProcessingMode.INDIVIDUAL_PAGES:
|
|
mode = "Page-by-page"
|
|
elif metadata.processing_mode == ProcessingMode.SPLIT_SECTIONS:
|
|
mode = "Sections"
|
|
else:
|
|
mode = "Whole doc"
|
|
|
|
# Section or page info
|
|
content_info = ""
|
|
if metadata.split_heading:
|
|
content_info = f" - {metadata.split_heading}"
|
|
elif metadata.page_range and len(metadata.page_range) == 2:
|
|
if metadata.page_range[0] == metadata.page_range[1]:
|
|
content_info = f" - Page {metadata.page_range[0]}"
|
|
else:
|
|
content_info = f" - Pages {metadata.page_range[0]}-{metadata.page_range[1]}"
|
|
|
|
# Producer info
|
|
producer_info = ""
|
|
if metadata.producer == "auto_phase2":
|
|
producer_info = " (Auto)"
|
|
elif metadata.producer.startswith("auto"):
|
|
producer_info = " (Auto)"
|
|
|
|
return f"{pipeline_name} {mode}{content_info}{producer_info}"
|
|
|
|
def group_bundles_by_metadata(bundles: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""
|
|
Group bundles by their metadata for display purposes.
|
|
|
|
Returns a dictionary mapping group keys to lists of bundles.
|
|
"""
|
|
|
|
groups = {}
|
|
ungrouped = []
|
|
|
|
for bundle in bundles:
|
|
extra = bundle.get('extra', {})
|
|
|
|
# Skip bundles without the new metadata format
|
|
if not extra.get('bundle_metadata_version'):
|
|
ungrouped.append(bundle)
|
|
continue
|
|
|
|
metadata = BundleMetadata.from_artefact_extra(
|
|
bundle['file_id'],
|
|
bundle['id'],
|
|
extra
|
|
)
|
|
|
|
if metadata.group_id:
|
|
group_key = f"group:{metadata.group_id}"
|
|
if group_key not in groups:
|
|
groups[group_key] = []
|
|
groups[group_key].append(bundle)
|
|
else:
|
|
ungrouped.append(bundle)
|
|
|
|
# Add ungrouped bundles as individual groups
|
|
for bundle in ungrouped:
|
|
single_key = f"single:{bundle['id']}"
|
|
groups[single_key] = [bundle]
|
|
|
|
return groups
|