""" Uniform Bundle Metadata Architecture This module defines standardized metadata structures for all document processing pipelines to ensure consistency and interoperability across OCR, No-OCR, and VLM bundles. Features: - Uniform metadata schema for all pipeline types - Consistent grouping and ordering mechanisms - Pipeline-agnostic bundle identification - Enhanced metadata for frontend display - Backward compatibility with existing bundles """ import uuid import hashlib import json from datetime import datetime from typing import Dict, Any, Optional, List, Union, Literal from dataclasses import dataclass, asdict from enum import Enum class PipelineType(Enum): """Supported pipeline types""" STANDARD = "standard" VLM = "vlm" ASR = "asr" class ProcessingMode(Enum): """How content was processed""" WHOLE_DOCUMENT = "whole_document" SPLIT_SECTIONS = "split_sections" INDIVIDUAL_PAGES = "individual_pages" PAGE_BUNDLE = "page_bundle" class BundleType(Enum): """Type of bundle created""" SINGLE_ARTEFACT = "single_artefact" SPLIT_PACK = "split_pack" PAGE_BUNDLE = "page_bundle" VLM_SECTION_BUNDLE = "vlm_section_bundle" # New unified bundle types DOCLING_BUNDLE = "docling_bundle" # Single coherent processing unit DOCLING_BUNDLE_SPLIT = "docling_bundle_split" # Container for multi-unit processing @dataclass class BundleMetadata: """ Standardized metadata structure for all document processing bundles. This ensures consistency across OCR, No-OCR, and VLM pipelines while maintaining backward compatibility. """ # Core identification bundle_id: str file_id: str pipeline: PipelineType processing_mode: ProcessingMode bundle_type: BundleType # Grouping and ordering group_id: Optional[str] = None split_order: Optional[int] = None split_total: Optional[int] = None split_heading: Optional[str] = None # Content information page_range: Optional[List[int]] = None # [start_page, end_page] page_count: Optional[int] = None section_title: Optional[str] = None section_level: Optional[int] = None # Processing details config: Optional[Dict[str, Any]] = None settings_fingerprint: Optional[str] = None processing_time: Optional[float] = None # Pipeline-specific metadata pipeline_metadata: Optional[Dict[str, Any]] = None # Producer information producer: str = "manual" # manual, auto_phase2, auto_split, etc. created_at: Optional[str] = None # Status and quality status: str = "completed" quality_score: Optional[float] = None def __post_init__(self): """Set defaults and compute derived fields""" if self.created_at is None: self.created_at = datetime.utcnow().isoformat() if self.bundle_id is None: self.bundle_id = str(uuid.uuid4()) # Compute settings fingerprint if config provided if self.config and not self.settings_fingerprint: self.settings_fingerprint = self._compute_settings_fingerprint(self.config) def _compute_settings_fingerprint(self, config: Dict[str, Any]) -> str: """Compute a fingerprint for configuration settings (excluding page_range)""" try: config_for_hash = dict(config) config_for_hash.pop('page_range', None) # Exclude page ranges from fingerprint config_str = json.dumps(config_for_hash, sort_keys=True, ensure_ascii=False) return hashlib.sha1(config_str.encode('utf-8')).hexdigest()[:16] except Exception: return str(uuid.uuid4())[:16] def to_artefact_extra(self) -> Dict[str, Any]: """Convert to format suitable for document_artefacts.extra field""" extra = {} # Core fields extra['bundle_metadata_version'] = '1.0' extra['pipeline'] = self.pipeline.value extra['processing_mode'] = self.processing_mode.value extra['bundle_type'] = self.bundle_type.value # Store original pipeline type for UI differentiation if hasattr(self, 'original_pipeline_type'): extra['original_pipeline'] = self.original_pipeline_type # Grouping fields if self.group_id: extra['group_id'] = self.group_id if self.split_order is not None: extra['split_order'] = self.split_order if self.split_total is not None: extra['split_total'] = self.split_total if self.split_heading: extra['split_heading'] = self.split_heading # Content fields if self.page_range: extra['page_range'] = self.page_range if self.page_count is not None: extra['page_count'] = self.page_count if self.section_title: extra['section_title'] = self.section_title if self.section_level is not None: extra['section_level'] = self.section_level # Processing fields if self.config: extra['config'] = self.config if self.settings_fingerprint: extra['settings_fingerprint'] = self.settings_fingerprint if self.processing_time is not None: extra['processing_time'] = self.processing_time # Pipeline-specific metadata if self.pipeline_metadata: extra['pipeline_metadata'] = self.pipeline_metadata # Producer and quality extra['producer'] = self.producer if self.quality_score is not None: extra['quality_score'] = self.quality_score return extra @classmethod def from_artefact_extra(cls, file_id: str, artefact_id: str, extra: Dict[str, Any]) -> 'BundleMetadata': """Create BundleMetadata from document_artefacts.extra field""" # Extract core fields with fallbacks for backward compatibility pipeline_str = extra.get('pipeline', 'standard') try: pipeline = PipelineType(pipeline_str) except ValueError: pipeline = PipelineType.STANDARD processing_mode_str = extra.get('processing_mode', 'whole_document') try: processing_mode = ProcessingMode(processing_mode_str) except ValueError: processing_mode = ProcessingMode.WHOLE_DOCUMENT bundle_type_str = extra.get('bundle_type', 'single_artefact') try: bundle_type = BundleType(bundle_type_str) except ValueError: bundle_type = BundleType.SINGLE_ARTEFACT return cls( bundle_id=artefact_id, file_id=file_id, pipeline=pipeline, processing_mode=processing_mode, bundle_type=bundle_type, group_id=extra.get('group_id'), split_order=extra.get('split_order'), split_total=extra.get('split_total'), split_heading=extra.get('split_heading'), page_range=extra.get('page_range'), page_count=extra.get('page_count'), section_title=extra.get('section_title'), section_level=extra.get('section_level'), config=extra.get('config'), settings_fingerprint=extra.get('settings_fingerprint'), processing_time=extra.get('processing_time'), pipeline_metadata=extra.get('pipeline_metadata'), producer=extra.get('producer', 'manual'), created_at=extra.get('created_at'), quality_score=extra.get('quality_score') ) class BundleMetadataBuilder: """Helper class to build standardized bundle metadata""" def __init__(self, file_id: str, pipeline: PipelineType): self.file_id = file_id self.pipeline = pipeline self.metadata = BundleMetadata( bundle_id=str(uuid.uuid4()), file_id=file_id, pipeline=pipeline, processing_mode=ProcessingMode.WHOLE_DOCUMENT, bundle_type=BundleType.SINGLE_ARTEFACT ) def set_processing_mode(self, mode: ProcessingMode) -> 'BundleMetadataBuilder': """Set processing mode""" self.metadata.processing_mode = mode return self def set_bundle_type(self, bundle_type: BundleType) -> 'BundleMetadataBuilder': """Set bundle type""" self.metadata.bundle_type = bundle_type return self def set_group_info(self, group_id: str, split_order: int = None, split_total: int = None, split_heading: str = None) -> 'BundleMetadataBuilder': """Set grouping information for split documents""" self.metadata.group_id = group_id self.metadata.split_order = split_order self.metadata.split_total = split_total self.metadata.split_heading = split_heading return self def set_page_info(self, page_range: List[int] = None, page_count: int = None) -> 'BundleMetadataBuilder': """Set page information""" self.metadata.page_range = page_range self.metadata.page_count = page_count return self def set_section_info(self, title: str = None, level: int = None) -> 'BundleMetadataBuilder': """Set section information""" self.metadata.section_title = title self.metadata.section_level = level return self def set_config(self, config: Dict[str, Any]) -> 'BundleMetadataBuilder': """Set processing configuration""" self.metadata.config = config self.metadata.settings_fingerprint = self.metadata._compute_settings_fingerprint(config) return self def set_producer(self, producer: str) -> 'BundleMetadataBuilder': """Set producer information""" self.metadata.producer = producer return self def set_pipeline_metadata(self, metadata: Dict[str, Any]) -> 'BundleMetadataBuilder': """Set pipeline-specific metadata""" self.metadata.pipeline_metadata = metadata return self def set_quality_score(self, score: float) -> 'BundleMetadataBuilder': """Set quality score""" self.metadata.quality_score = score return self def build(self) -> BundleMetadata: """Build the final metadata""" return self.metadata def create_standard_metadata( file_id: str, pipeline: Literal["ocr", "no_ocr", "vlm"] = "no_ocr", processing_mode: Literal["whole_document", "split_sections", "individual_pages", "pages", "sections", "chunks"] = "split_sections", config: Dict[str, Any] = None, group_id: str = None, split_order: int = None, split_total: int = None, split_heading: str = None, page_range: List[int] = None, producer: str = "auto_phase2" ) -> BundleMetadata: """ Convenience function to create standardized metadata for common use cases. """ # Map pipeline strings to enums pipeline_map = { "ocr": PipelineType.STANDARD, "no_ocr": PipelineType.STANDARD, "vlm": PipelineType.VLM } # Enhanced processing mode mapping with new bundle architecture processing_mode_map = { "whole_document": ProcessingMode.WHOLE_DOCUMENT, "split_sections": ProcessingMode.SPLIT_SECTIONS, "individual_pages": ProcessingMode.INDIVIDUAL_PAGES, "split_by_pages": ProcessingMode.INDIVIDUAL_PAGES, # Split by pages processing "split_by_sections": ProcessingMode.SPLIT_SECTIONS, # Split by sections processing "split_by_chunks": ProcessingMode.SPLIT_SECTIONS, # Split by chunks processing "pages": ProcessingMode.INDIVIDUAL_PAGES, # Alias for page-based processing "sections": ProcessingMode.SPLIT_SECTIONS, # Alias for section-based processing "chunks": ProcessingMode.SPLIT_SECTIONS, # Chunks treated as sections } # Determine bundle type based on processing mode and grouping if processing_mode == "whole_document": bundle_type = BundleType.DOCLING_BUNDLE else: bundle_type = BundleType.DOCLING_BUNDLE_SPLIT builder = BundleMetadataBuilder(file_id, pipeline_map[pipeline]) builder.set_processing_mode(processing_mode_map[processing_mode]) builder.set_bundle_type(bundle_type) builder.set_producer(producer) # Store original pipeline type for UI differentiation builder.metadata.original_pipeline_type = pipeline if config: # Add pipeline-specific config markers enhanced_config = dict(config) if pipeline == "ocr": enhanced_config["do_ocr"] = True elif pipeline == "no_ocr": enhanced_config["do_ocr"] = False elif pipeline == "vlm": enhanced_config["pipeline"] = "vlm" builder.set_config(enhanced_config) if group_id: builder.set_group_info(group_id, split_order, split_total, split_heading) if page_range: builder.set_page_info(page_range) # Set section info if we have a heading if split_heading: builder.set_section_info(split_heading) return builder.build() def create_bundle_split_metadata( file_id: str, pipeline: Literal["ocr", "no_ocr", "vlm"] = "no_ocr", split_mode: Literal["split_by_pages", "split_by_sections", "split_by_chunks"] = "split_by_sections", config: Dict[str, Any] = None, group_id: str = None, producer: str = "auto_phase2", processing_data: Dict[str, Any] = None ) -> BundleMetadata: """ Create metadata specifically for split bundle processing. This is used for the new docling_bundle_split task type. """ # Map split modes to processing modes mode_map = { "split_by_pages": "pages", "split_by_sections": "sections", "split_by_chunks": "sections" # Chunks treated as sections } processing_mode = mode_map[split_mode] metadata = create_standard_metadata( file_id=file_id, pipeline=pipeline, processing_mode=processing_mode, config=config, group_id=group_id, producer=producer ) # Add split-specific metadata if processing_data: split_metadata = { 'split_mode': split_mode, 'processing_data': processing_data } if metadata.pipeline_metadata: metadata.pipeline_metadata.update(split_metadata) else: metadata.pipeline_metadata = split_metadata return metadata def get_bundle_display_name(metadata: BundleMetadata) -> str: """Generate a user-friendly display name for a bundle""" # Use explicit display name if available if hasattr(metadata, 'display_name') and metadata.display_name: return metadata.display_name # Generate based on bundle type and processing mode if metadata.bundle_type == BundleType.DOCLING_BUNDLE: return "Complete Document" elif metadata.bundle_type == BundleType.DOCLING_BUNDLE_SPLIT: if metadata.processing_mode == ProcessingMode.INDIVIDUAL_PAGES: if metadata.page_range: return f"Page {metadata.page_range[0]}" return "Page Bundle" elif metadata.processing_mode == ProcessingMode.SPLIT_SECTIONS: if metadata.section_title: order_prefix = f"{metadata.split_order:02d}. " if metadata.split_order else "" page_suffix = "" if metadata.page_range and len(metadata.page_range) >= 2: page_suffix = f" (p{metadata.page_range[0]}-{metadata.page_range[1]})" return f"{order_prefix}{metadata.section_title}{page_suffix}" return f"Section {metadata.split_order or 1}" else: return "Document Bundle" # Fallback return metadata.section_title or metadata.split_heading or f"Bundle {metadata.bundle_id[:8]}" def create_organized_bundle_manifest(bundles: list, split_mode: str, pipeline: str) -> dict: """ Create an organized master manifest for split bundles with proper labeling and ordering. Args: bundles: List of individual bundle data split_mode: The splitting mode used (pages, sections, chunks) pipeline: The pipeline type (no_ocr, ocr, vlm) Returns: Enhanced manifest with organization metadata """ # Sort bundles by their ordering key if split_mode == 'split_by_pages': sorted_bundles = sorted(bundles, key=lambda x: x.get('page_number', 0)) display_name = f"{pipeline.upper()} Document Pages ({len(bundles)} pages)" organization = { 'type': 'pages', 'sort_field': 'page_number', 'sort_order': 'asc', 'grouping': 'individual_pages' } elif split_mode == 'split_by_sections': sorted_bundles = sorted(bundles, key=lambda x: x.get('split_order', 0)) display_name = f"{pipeline.upper()} Document Sections ({len(bundles)} sections)" organization = { 'type': 'sections', 'sort_field': 'split_order', 'sort_order': 'asc', 'grouping': 'split_map_sections', 'has_titles': True, 'ordering_preserved': True } elif split_mode == 'split_by_chunks': sorted_bundles = sorted(bundles, key=lambda x: x.get('split_order', 0)) display_name = f"{pipeline.upper()} Document Chunks ({len(bundles)} chunks)" organization = { 'type': 'chunks', 'sort_field': 'split_order', 'sort_order': 'asc', 'grouping': 'fallback_chunks' } else: sorted_bundles = bundles display_name = f"{pipeline.upper()} Document Bundles" organization = { 'type': 'unknown', 'sort_field': 'split_order', 'sort_order': 'asc' } return { 'bundles': sorted_bundles, 'display_name': display_name, 'organization': organization, 'total_bundles': len(bundles), 'pipeline': pipeline, 'split_mode': split_mode } # Pipeline display names pipeline_names = { PipelineType.STANDARD: "Standard", PipelineType.VLM: "VLM", PipelineType.ASR: "ASR" } pipeline_name = pipeline_names.get(metadata.pipeline, metadata.pipeline.value) # OCR indication for standard pipeline if metadata.pipeline == PipelineType.STANDARD and metadata.config: ocr_enabled = metadata.config.get('do_ocr', False) pipeline_name = f"{pipeline_name} ({'OCR' if ocr_enabled else 'No-OCR'})" # Processing mode indication if metadata.processing_mode == ProcessingMode.INDIVIDUAL_PAGES: mode = "Page-by-page" elif metadata.processing_mode == ProcessingMode.SPLIT_SECTIONS: mode = "Sections" else: mode = "Whole doc" # Section or page info content_info = "" if metadata.split_heading: content_info = f" - {metadata.split_heading}" elif metadata.page_range and len(metadata.page_range) == 2: if metadata.page_range[0] == metadata.page_range[1]: content_info = f" - Page {metadata.page_range[0]}" else: content_info = f" - Pages {metadata.page_range[0]}-{metadata.page_range[1]}" # Producer info producer_info = "" if metadata.producer == "auto_phase2": producer_info = " (Auto)" elif metadata.producer.startswith("auto"): producer_info = " (Auto)" return f"{pipeline_name} {mode}{content_info}{producer_info}" def group_bundles_by_metadata(bundles: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: """ Group bundles by their metadata for display purposes. Returns a dictionary mapping group keys to lists of bundles. """ groups = {} ungrouped = [] for bundle in bundles: extra = bundle.get('extra', {}) # Skip bundles without the new metadata format if not extra.get('bundle_metadata_version'): ungrouped.append(bundle) continue metadata = BundleMetadata.from_artefact_extra( bundle['file_id'], bundle['id'], extra ) if metadata.group_id: group_key = f"group:{metadata.group_id}" if group_key not in groups: groups[group_key] = [] groups[group_key].append(bundle) else: ungrouped.append(bundle) # Add ungrouped bundles as individual groups for bundle in ungrouped: single_key = f"single:{bundle['id']}" groups[single_key] = [bundle] return groups