""" Enhanced Document Analysis Module This module provides comprehensive document structure analysis beyond basic split maps, including multi-level hierarchies, numbering system detection, and content type analysis. """ import re import json import uuid import datetime from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, asdict from pathlib import Path import fitz # PyMuPDF from modules.logger_tool import initialise_logger import os logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True) @dataclass class DocumentSection: """Represents a document section at any hierarchy level""" id: str title: str level: int start_page: int end_page: int numbering: Optional[str] = None parent_id: Optional[str] = None children: List[str] = None content_types: Dict[str, int] = None confidence: float = 0.8 def __post_init__(self): if self.children is None: self.children = [] if self.content_types is None: self.content_types = {} @dataclass class NumberingSystem: """Represents a detected numbering or coding system""" system_id: str pattern: str description: str examples: List[str] applies_to_levels: List[int] confidence: float @dataclass class ContentBlock: """Represents a content block with type information""" block_id: str page: int bbox: Tuple[float, float, float, float] # x0, y0, x1, y1 content_type: str # 'text', 'image', 'table', 'formula', 'diagram' text_content: Optional[str] = None metadata: Dict[str, Any] = None class DocumentAnalyzer: """Enhanced document analyzer for comprehensive structure detection""" def __init__(self): self.numbering_patterns = [ # Roman numerals (r'^([IVX]+)\.?\s+(.+)', 'roman_numerals', 'Roman numeral chapters (I, II, III, ...)'), # Decimal numbering (r'^(\d+(?:\.\d+)*)\.?\s+(.+)', 'decimal_numbering', 'Decimal numbering (1.1, 1.2.1, ...)'), # Letter numbering (r'^([A-Z])\.?\s+(.+)', 'letter_chapters', 'Letter chapters (A, B, C, ...)'), (r'^([a-z])\.?\s+(.+)', 'letter_sections', 'Letter sections (a, b, c, ...)'), # Bracketed numbering (r'^\((\d+)\)\s+(.+)', 'bracketed_numbers', 'Bracketed numbers ((1), (2), ...)'), # Legal numbering (r'^§\s*(\d+(?:\.\d+)*)\s+(.+)', 'legal_sections', 'Legal sections (§1, §1.1, ...)'), # Article numbering (r'^(?:Article|Art\.?)\s+(\d+(?:\.\d+)*)\s+(.+)', 'articles', 'Article numbering'), ] def analyze_document_structure(self, pdf_bytes: bytes, tika_json: Dict = None, docling_json: Dict = None) -> Dict[str, Any]: """ Fast, header-only structure analysis. - Prefer PDF outline/bookmarks - Otherwise, use Docling heading roles from existing artefact JSON - No full-text scans; no per-page content analysis """ logger.info("Starting FAST document structure analysis (headings only)") doc = fitz.open(stream=pdf_bytes, filetype="pdf") page_count = len(doc) # Try PDF outline first sections: List[DocumentSection] = self._extract_from_pdf_outline(doc) # Fallback to Docling headings if outline inadequate if (not sections) and docling_json: sections = self._extract_from_docling(docling_json) # Final fallback: coarse windows if not sections: sections = [] step = max(10, min(30, page_count // 5 or 1)) i = 1 idx = 1 while i <= page_count: end = min(page_count, i + step - 1) sections.append(DocumentSection( id=f"sec{idx:02d}", title=f"Pages {i}-{end}", level=1, start_page=i, end_page=end, confidence=0.2 )) i = end + 1 idx += 1 # Build hierarchy relationships and adjust parent end-pages using sibling boundaries sections = self._build_section_hierarchy(sections) # Normalize and finalize sections (clamp, front matter, last-page coverage) sections = self._normalize_and_cover(sections, page_count) doc.close() return { "version": 2, "analysis_timestamp": datetime.datetime.utcnow().isoformat() + "Z", "page_count": page_count, "sections": [asdict(section) for section in sections], "metadata": { "analyzer_version": "2.1-fast", "analysis_methods": ["pdf_outline", "docling_headings"], } } def _extract_hierarchical_structure(self, doc: fitz.Document, tika_json: Dict = None, docling_json: Dict = None) -> List[DocumentSection]: # Kept for backward compat; delegate to outline + docling only sections = self._extract_from_pdf_outline(doc) if (not sections) and docling_json: sections = self._extract_from_docling(docling_json) return sections def _extract_from_pdf_outline(self, doc: fitz.Document) -> List[DocumentSection]: """Extract sections from PDF outline/bookmarks""" sections = [] toc = doc.get_toc(simple=False) for i, (level, title, page, dest) in enumerate(toc): if page < 1: continue section_id = f"outline_{i:03d}" # Calculate end page (next section's start - 1, or last page) end_page = page for j in range(i + 1, len(toc)): if toc[j][2] > 0: # Valid page number end_page = toc[j][2] - 1 break else: end_page = len(doc) section = DocumentSection( id=section_id, title=title.strip(), level=level, start_page=page, end_page=end_page, confidence=0.95 ) sections.append(section) return sections def _extract_from_docling(self, docling_json: Dict) -> List[DocumentSection]: """Extract sections from Docling analysis""" sections: List[DocumentSection] = [] blocks = docling_json.get("blocks", []) or docling_json.get("elements", []) # 1) Collect headings with page and level, preserving order heading_items: List[Tuple[int, str, int, str]] = [] # (page, level, order_index, title) order_index = 0 for block in blocks: role_raw = (block.get("role") or block.get("type") or "").lower() if not ("heading" in role_raw or role_raw in ("h1", "h2", "h3", "h4", "h5", "h6", "title")): continue text = (block.get("text") or block.get("content") or "").strip() if not text: continue page_val = block.get("page", None) if page_val is None: page_val = block.get("page_no", None) if page_val is None: page_val = block.get("pageIndex", None) try: page_int = int(page_val) if page_val is not None else 1 except Exception: page_int = 1 # Normalize to 1-based page_int = page_int + 1 if page_int == 0 else page_int # Determine heading level level = 1 if "1" in role_raw or role_raw == "title": level = 1 elif "2" in role_raw: level = 2 elif "3" in role_raw: level = 3 elif "4" in role_raw: level = 4 elif "5" in role_raw: level = 5 elif "6" in role_raw: level = 6 heading_items.append((page_int, level, order_index, text)) order_index += 1 if not heading_items: return sections # 2) Sort by page then order_index to preserve within-page order heading_items.sort(key=lambda h: (h[0], h[2])) # 3) Build sections with hierarchical end-page computation stack: List[DocumentSection] = [] idx_counter = 0 def close_until(level_threshold: int, next_page: int): nonlocal sections, stack while stack and stack[-1].level >= level_threshold: cur = stack.pop() # If next heading on same page, close at same page; else previous page if next_page <= cur.start_page: cur.end_page = cur.start_page else: cur.end_page = next_page - 1 if cur.end_page < cur.start_page: cur.end_page = cur.start_page sections.append(cur) for page_int, level, _, text in heading_items: # Close siblings and deeper levels close_until(level, page_int) # Open new heading section_id = f"docling_{idx_counter:03d}" idx_counter += 1 new_sec = DocumentSection( id=section_id, title=text, level=level, start_page=page_int, end_page=page_int, # temporary; will finalize when closing confidence=0.8 ) stack.append(new_sec) # Close any remaining open sections at document end later in normalization while stack: cur = stack.pop() sections.append(cur) return sections def _normalize_and_cover(self, sections: List[DocumentSection], page_count: int) -> List[DocumentSection]: """Harden outline sections while preserving hierarchy: - clamp each section to [1, page_count] - fix inverted ranges (but DO NOT remove hierarchical overlaps) - ensure coverage from page 1 with synthetic front-matter if needed - ensure last top-level section extends to page_count - compute numbering hint if missing """ if not sections: return sections # Clamp values per section; do not modify overlap relationships for s in sections: s.start_page = max(1, min(s.start_page or 1, page_count)) s.end_page = max(1, min(s.end_page or s.start_page, page_count)) if s.end_page < s.start_page: s.end_page = s.start_page # Maintain original order (as produced by extractor and hierarchy builder) # Insert synthetic front matter if needed if sections and sections[0].start_page > 1: # Generate a unique synthetic id that won't collide with existing ids existing_ids = {s.id for s in sections} base_id = "outline_front_matter" syn_id = base_id idx = 1 while syn_id in existing_ids: syn_id = f"{base_id}_{idx}" idx += 1 front = DocumentSection( id=syn_id, title="Front matter", level=1, start_page=1, end_page=sections[0].start_page - 1, confidence=0.6 ) sections.insert(0, front) # Ensure last top-level section covers to page_count top_levels = [s for s in sections if s.parent_id is None] if top_levels: last_top = top_levels[-1] if last_top.end_page < page_count: last_top.end_page = page_count # Light numbering extraction based on heading text prefix for s in sections: m = re.match(r"^\s*([A-Za-z]+|[IVXLCM]+|\d+(?:\.\d+)*)\.?\s+", s.title) if m: s.numbering = s.numbering or m.group(1) return sections def _extract_from_text_patterns(self, doc: fitz.Document) -> List[DocumentSection]: # Disabled in fast mode return [] def _estimate_section_level(self, numbering: str, system_type: str) -> int: """Estimate section level based on numbering pattern""" if system_type == 'roman_numerals': return 1 # Typically chapter level elif system_type == 'decimal_numbering': dots = numbering.count('.') return min(dots + 1, 6) # 1.1.1 = level 3 elif system_type == 'letter_chapters': return 1 elif system_type == 'letter_sections': return 2 else: return 2 # Default def _detect_numbering_systems(self, sections: List[DocumentSection]) -> List[NumberingSystem]: """Detect numbering systems used in the document""" systems = [] # Group sections by their numbering patterns pattern_groups = {} for section in sections: if section.numbering: for pattern, system_type, description in self.numbering_patterns: if re.match(pattern.replace(r'^(.+)', section.numbering), section.numbering): if system_type not in pattern_groups: pattern_groups[system_type] = { 'pattern': pattern, 'description': description, 'examples': [], 'levels': set(), 'count': 0 } pattern_groups[system_type]['examples'].append(section.numbering) pattern_groups[system_type]['levels'].add(section.level) pattern_groups[system_type]['count'] += 1 break # Create NumberingSystem objects for system_type, data in pattern_groups.items(): if data['count'] >= 2: # At least 2 examples to be confident system = NumberingSystem( system_id=system_type, pattern=data['pattern'], description=data['description'], examples=data['examples'][:5], # First 5 examples applies_to_levels=list(data['levels']), confidence=min(0.9, 0.5 + (data['count'] * 0.1)) ) systems.append(system) return systems def _analyze_content_types(self, doc: fitz.Document, sections: List[DocumentSection]) -> Dict[str, Any]: # Disabled in fast mode return {"total_blocks": 0, "content_types": {}, "sections": {}} def _detect_tables_in_page(self, page) -> int: return 0 def _has_complex_formatting(self, page) -> bool: return False def _merge_and_deduplicate_sections(self, sections: List[DocumentSection]) -> List[DocumentSection]: """Merge overlapping sections and remove duplicates""" if not sections: return [] # Sort by start page, then by level sections.sort(key=lambda s: (s.start_page, s.level)) merged = [] for section in sections: # Check if this section overlaps significantly with existing ones is_duplicate = False for existing in merged: if (existing.start_page == section.start_page and abs(existing.level - section.level) <= 1 and self._text_similarity(existing.title, section.title) > 0.8): # This is likely a duplicate, merge information if section.confidence > existing.confidence: existing.title = section.title existing.numbering = section.numbering or existing.numbering existing.confidence = section.confidence is_duplicate = True break if not is_duplicate: merged.append(section) return merged def _text_similarity(self, text1: str, text2: str) -> float: """Calculate text similarity (simple implementation)""" if not text1 or not text2: return 0.0 # Simple word-based similarity words1 = set(text1.lower().split()) words2 = set(text2.lower().split()) if not words1 and not words2: return 1.0 intersection = words1.intersection(words2) union = words1.union(words2) return len(intersection) / len(union) if union else 0.0 def _build_section_hierarchy(self, sections: List[DocumentSection]) -> List[DocumentSection]: """Build parent-child relationships between sections""" if not sections: return [] # Sort by start page and level sections.sort(key=lambda s: (s.start_page, s.level)) # Build hierarchy for i, section in enumerate(sections): # Find parent (previous section with lower level) for j in range(i - 1, -1, -1): potential_parent = sections[j] if (potential_parent.level < section.level and potential_parent.start_page <= section.start_page and potential_parent.end_page >= section.start_page): section.parent_id = potential_parent.id potential_parent.children.append(section.id) break # Update end pages based on children for j in range(i + 1, len(sections)): next_section = sections[j] if (next_section.level <= section.level and next_section.start_page > section.start_page): section.end_page = min(section.end_page, next_section.start_page - 1) break return sections def _generate_processing_recommendations(self, sections: List[DocumentSection], content_analysis: Dict) -> Dict[str, Any]: """Generate intelligent processing recommendations""" recommendations = { "document_length_category": "short", # short, medium, long "suggested_processing_approach": "full_document", # full_document, section_by_section "high_priority_sections": [], "ocr_recommended_sections": [], "image_analysis_sections": [], "table_extraction_sections": [], "estimated_total_time": 0 } total_pages = max(s.end_page for s in sections) if sections else 0 # Categorize document length if total_pages <= 10: recommendations["document_length_category"] = "short" recommendations["suggested_processing_approach"] = "full_document" elif total_pages <= 50: recommendations["document_length_category"] = "medium" recommendations["suggested_processing_approach"] = "section_by_section" else: recommendations["document_length_category"] = "long" recommendations["suggested_processing_approach"] = "section_by_section" # Analyze each section for recommendations total_estimated_time = 0 for section in sections: section_data = content_analysis["sections"].get(section.id, {}) # High priority sections (first few sections, or sections with important titles) if (section.start_page <= 5 or any(keyword in section.title.lower() for keyword in ['abstract', 'summary', 'introduction', 'conclusion', 'executive'])): recommendations["high_priority_sections"].append(section.id) # OCR recommendations if section_data.get("images", 0) > 0 or section_data.get("has_complex_formatting", False): recommendations["ocr_recommended_sections"].append(section.id) # Image analysis recommendations if section_data.get("images", 0) > 2: # Sections with multiple images recommendations["image_analysis_sections"].append(section.id) # Table extraction recommendations if section_data.get("tables", 0) > 0: recommendations["table_extraction_sections"].append(section.id) total_estimated_time += section_data.get("estimated_processing_time", 0) recommendations["estimated_total_time"] = total_estimated_time return recommendations def create_document_outline_hierarchy_artefact(file_id: str, pdf_bytes: bytes, tika_json: Dict = None, docling_json: Dict = None) -> Dict[str, Any]: """ Create a comprehensive document outline hierarchy artefact Args: file_id: File ID pdf_bytes: PDF file content tika_json: Optional Tika analysis results docling_json: Optional Docling analysis results Returns: Document outline hierarchy artefact """ analyzer = DocumentAnalyzer() analysis = analyzer.analyze_document_structure(pdf_bytes, tika_json, docling_json) # Add file metadata analysis["file_id"] = file_id analysis["artefact_id"] = str(uuid.uuid4()) analysis["artefact_type"] = "document_outline_hierarchy" return analysis