api/modules/document_analysis.py
2025-11-14 14:47:19 +00:00

545 lines
21 KiB
Python

"""
Enhanced Document Analysis Module
This module provides comprehensive document structure analysis beyond basic split maps,
including multi-level hierarchies, numbering system detection, and content type analysis.
"""
import re
import json
import uuid
import datetime
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
import fitz # PyMuPDF
from modules.logger_tool import initialise_logger
import os
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
@dataclass
class DocumentSection:
"""Represents a document section at any hierarchy level"""
id: str
title: str
level: int
start_page: int
end_page: int
numbering: Optional[str] = None
parent_id: Optional[str] = None
children: List[str] = None
content_types: Dict[str, int] = None
confidence: float = 0.8
def __post_init__(self):
if self.children is None:
self.children = []
if self.content_types is None:
self.content_types = {}
@dataclass
class NumberingSystem:
"""Represents a detected numbering or coding system"""
system_id: str
pattern: str
description: str
examples: List[str]
applies_to_levels: List[int]
confidence: float
@dataclass
class ContentBlock:
"""Represents a content block with type information"""
block_id: str
page: int
bbox: Tuple[float, float, float, float] # x0, y0, x1, y1
content_type: str # 'text', 'image', 'table', 'formula', 'diagram'
text_content: Optional[str] = None
metadata: Dict[str, Any] = None
class DocumentAnalyzer:
"""Enhanced document analyzer for comprehensive structure detection"""
def __init__(self):
self.numbering_patterns = [
# Roman numerals
(r'^([IVX]+)\.?\s+(.+)', 'roman_numerals', 'Roman numeral chapters (I, II, III, ...)'),
# Decimal numbering
(r'^(\d+(?:\.\d+)*)\.?\s+(.+)', 'decimal_numbering', 'Decimal numbering (1.1, 1.2.1, ...)'),
# Letter numbering
(r'^([A-Z])\.?\s+(.+)', 'letter_chapters', 'Letter chapters (A, B, C, ...)'),
(r'^([a-z])\.?\s+(.+)', 'letter_sections', 'Letter sections (a, b, c, ...)'),
# Bracketed numbering
(r'^\((\d+)\)\s+(.+)', 'bracketed_numbers', 'Bracketed numbers ((1), (2), ...)'),
# Legal numbering
(r'\s*(\d+(?:\.\d+)*)\s+(.+)', 'legal_sections', 'Legal sections (§1, §1.1, ...)'),
# Article numbering
(r'^(?:Article|Art\.?)\s+(\d+(?:\.\d+)*)\s+(.+)', 'articles', 'Article numbering'),
]
def analyze_document_structure(self, pdf_bytes: bytes, tika_json: Dict = None,
docling_json: Dict = None) -> Dict[str, Any]:
"""
Fast, header-only structure analysis.
- Prefer PDF outline/bookmarks
- Otherwise, use Docling heading roles from existing artefact JSON
- No full-text scans; no per-page content analysis
"""
logger.info("Starting FAST document structure analysis (headings only)")
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
page_count = len(doc)
# Try PDF outline first
sections: List[DocumentSection] = self._extract_from_pdf_outline(doc)
# Fallback to Docling headings if outline inadequate
if (not sections) and docling_json:
sections = self._extract_from_docling(docling_json)
# Final fallback: coarse windows
if not sections:
sections = []
step = max(10, min(30, page_count // 5 or 1))
i = 1
idx = 1
while i <= page_count:
end = min(page_count, i + step - 1)
sections.append(DocumentSection(
id=f"sec{idx:02d}",
title=f"Pages {i}-{end}",
level=1,
start_page=i,
end_page=end,
confidence=0.2
))
i = end + 1
idx += 1
# Build hierarchy relationships and adjust parent end-pages using sibling boundaries
sections = self._build_section_hierarchy(sections)
# Normalize and finalize sections (clamp, front matter, last-page coverage)
sections = self._normalize_and_cover(sections, page_count)
doc.close()
return {
"version": 2,
"analysis_timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"page_count": page_count,
"sections": [asdict(section) for section in sections],
"metadata": {
"analyzer_version": "2.1-fast",
"analysis_methods": ["pdf_outline", "docling_headings"],
}
}
def _extract_hierarchical_structure(self, doc: fitz.Document, tika_json: Dict = None,
docling_json: Dict = None) -> List[DocumentSection]:
# Kept for backward compat; delegate to outline + docling only
sections = self._extract_from_pdf_outline(doc)
if (not sections) and docling_json:
sections = self._extract_from_docling(docling_json)
return sections
def _extract_from_pdf_outline(self, doc: fitz.Document) -> List[DocumentSection]:
"""Extract sections from PDF outline/bookmarks"""
sections = []
toc = doc.get_toc(simple=False)
for i, (level, title, page, dest) in enumerate(toc):
if page < 1:
continue
section_id = f"outline_{i:03d}"
# Calculate end page (next section's start - 1, or last page)
end_page = page
for j in range(i + 1, len(toc)):
if toc[j][2] > 0: # Valid page number
end_page = toc[j][2] - 1
break
else:
end_page = len(doc)
section = DocumentSection(
id=section_id,
title=title.strip(),
level=level,
start_page=page,
end_page=end_page,
confidence=0.95
)
sections.append(section)
return sections
def _extract_from_docling(self, docling_json: Dict) -> List[DocumentSection]:
"""Extract sections from Docling analysis"""
sections: List[DocumentSection] = []
blocks = docling_json.get("blocks", []) or docling_json.get("elements", [])
# 1) Collect headings with page and level, preserving order
heading_items: List[Tuple[int, str, int, str]] = [] # (page, level, order_index, title)
order_index = 0
for block in blocks:
role_raw = (block.get("role") or block.get("type") or "").lower()
if not ("heading" in role_raw or role_raw in ("h1", "h2", "h3", "h4", "h5", "h6", "title")):
continue
text = (block.get("text") or block.get("content") or "").strip()
if not text:
continue
page_val = block.get("page", None)
if page_val is None:
page_val = block.get("page_no", None)
if page_val is None:
page_val = block.get("pageIndex", None)
try:
page_int = int(page_val) if page_val is not None else 1
except Exception:
page_int = 1
# Normalize to 1-based
page_int = page_int + 1 if page_int == 0 else page_int
# Determine heading level
level = 1
if "1" in role_raw or role_raw == "title":
level = 1
elif "2" in role_raw:
level = 2
elif "3" in role_raw:
level = 3
elif "4" in role_raw:
level = 4
elif "5" in role_raw:
level = 5
elif "6" in role_raw:
level = 6
heading_items.append((page_int, level, order_index, text))
order_index += 1
if not heading_items:
return sections
# 2) Sort by page then order_index to preserve within-page order
heading_items.sort(key=lambda h: (h[0], h[2]))
# 3) Build sections with hierarchical end-page computation
stack: List[DocumentSection] = []
idx_counter = 0
def close_until(level_threshold: int, next_page: int):
nonlocal sections, stack
while stack and stack[-1].level >= level_threshold:
cur = stack.pop()
# If next heading on same page, close at same page; else previous page
if next_page <= cur.start_page:
cur.end_page = cur.start_page
else:
cur.end_page = next_page - 1
if cur.end_page < cur.start_page:
cur.end_page = cur.start_page
sections.append(cur)
for page_int, level, _, text in heading_items:
# Close siblings and deeper levels
close_until(level, page_int)
# Open new heading
section_id = f"docling_{idx_counter:03d}"
idx_counter += 1
new_sec = DocumentSection(
id=section_id,
title=text,
level=level,
start_page=page_int,
end_page=page_int, # temporary; will finalize when closing
confidence=0.8
)
stack.append(new_sec)
# Close any remaining open sections at document end later in normalization
while stack:
cur = stack.pop()
sections.append(cur)
return sections
def _normalize_and_cover(self, sections: List[DocumentSection], page_count: int) -> List[DocumentSection]:
"""Harden outline sections while preserving hierarchy:
- clamp each section to [1, page_count]
- fix inverted ranges (but DO NOT remove hierarchical overlaps)
- ensure coverage from page 1 with synthetic front-matter if needed
- ensure last top-level section extends to page_count
- compute numbering hint if missing
"""
if not sections:
return sections
# Clamp values per section; do not modify overlap relationships
for s in sections:
s.start_page = max(1, min(s.start_page or 1, page_count))
s.end_page = max(1, min(s.end_page or s.start_page, page_count))
if s.end_page < s.start_page:
s.end_page = s.start_page
# Maintain original order (as produced by extractor and hierarchy builder)
# Insert synthetic front matter if needed
if sections and sections[0].start_page > 1:
# Generate a unique synthetic id that won't collide with existing ids
existing_ids = {s.id for s in sections}
base_id = "outline_front_matter"
syn_id = base_id
idx = 1
while syn_id in existing_ids:
syn_id = f"{base_id}_{idx}"
idx += 1
front = DocumentSection(
id=syn_id,
title="Front matter",
level=1,
start_page=1,
end_page=sections[0].start_page - 1,
confidence=0.6
)
sections.insert(0, front)
# Ensure last top-level section covers to page_count
top_levels = [s for s in sections if s.parent_id is None]
if top_levels:
last_top = top_levels[-1]
if last_top.end_page < page_count:
last_top.end_page = page_count
# Light numbering extraction based on heading text prefix
for s in sections:
m = re.match(r"^\s*([A-Za-z]+|[IVXLCM]+|\d+(?:\.\d+)*)\.?\s+", s.title)
if m:
s.numbering = s.numbering or m.group(1)
return sections
def _extract_from_text_patterns(self, doc: fitz.Document) -> List[DocumentSection]:
# Disabled in fast mode
return []
def _estimate_section_level(self, numbering: str, system_type: str) -> int:
"""Estimate section level based on numbering pattern"""
if system_type == 'roman_numerals':
return 1 # Typically chapter level
elif system_type == 'decimal_numbering':
dots = numbering.count('.')
return min(dots + 1, 6) # 1.1.1 = level 3
elif system_type == 'letter_chapters':
return 1
elif system_type == 'letter_sections':
return 2
else:
return 2 # Default
def _detect_numbering_systems(self, sections: List[DocumentSection]) -> List[NumberingSystem]:
"""Detect numbering systems used in the document"""
systems = []
# Group sections by their numbering patterns
pattern_groups = {}
for section in sections:
if section.numbering:
for pattern, system_type, description in self.numbering_patterns:
if re.match(pattern.replace(r'^(.+)', section.numbering), section.numbering):
if system_type not in pattern_groups:
pattern_groups[system_type] = {
'pattern': pattern,
'description': description,
'examples': [],
'levels': set(),
'count': 0
}
pattern_groups[system_type]['examples'].append(section.numbering)
pattern_groups[system_type]['levels'].add(section.level)
pattern_groups[system_type]['count'] += 1
break
# Create NumberingSystem objects
for system_type, data in pattern_groups.items():
if data['count'] >= 2: # At least 2 examples to be confident
system = NumberingSystem(
system_id=system_type,
pattern=data['pattern'],
description=data['description'],
examples=data['examples'][:5], # First 5 examples
applies_to_levels=list(data['levels']),
confidence=min(0.9, 0.5 + (data['count'] * 0.1))
)
systems.append(system)
return systems
def _analyze_content_types(self, doc: fitz.Document, sections: List[DocumentSection]) -> Dict[str, Any]:
# Disabled in fast mode
return {"total_blocks": 0, "content_types": {}, "sections": {}}
def _detect_tables_in_page(self, page) -> int:
return 0
def _has_complex_formatting(self, page) -> bool:
return False
def _merge_and_deduplicate_sections(self, sections: List[DocumentSection]) -> List[DocumentSection]:
"""Merge overlapping sections and remove duplicates"""
if not sections:
return []
# Sort by start page, then by level
sections.sort(key=lambda s: (s.start_page, s.level))
merged = []
for section in sections:
# Check if this section overlaps significantly with existing ones
is_duplicate = False
for existing in merged:
if (existing.start_page == section.start_page and
abs(existing.level - section.level) <= 1 and
self._text_similarity(existing.title, section.title) > 0.8):
# This is likely a duplicate, merge information
if section.confidence > existing.confidence:
existing.title = section.title
existing.numbering = section.numbering or existing.numbering
existing.confidence = section.confidence
is_duplicate = True
break
if not is_duplicate:
merged.append(section)
return merged
def _text_similarity(self, text1: str, text2: str) -> float:
"""Calculate text similarity (simple implementation)"""
if not text1 or not text2:
return 0.0
# Simple word-based similarity
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 and not words2:
return 1.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union) if union else 0.0
def _build_section_hierarchy(self, sections: List[DocumentSection]) -> List[DocumentSection]:
"""Build parent-child relationships between sections"""
if not sections:
return []
# Sort by start page and level
sections.sort(key=lambda s: (s.start_page, s.level))
# Build hierarchy
for i, section in enumerate(sections):
# Find parent (previous section with lower level)
for j in range(i - 1, -1, -1):
potential_parent = sections[j]
if (potential_parent.level < section.level and
potential_parent.start_page <= section.start_page and
potential_parent.end_page >= section.start_page):
section.parent_id = potential_parent.id
potential_parent.children.append(section.id)
break
# Update end pages based on children
for j in range(i + 1, len(sections)):
next_section = sections[j]
if (next_section.level <= section.level and
next_section.start_page > section.start_page):
section.end_page = min(section.end_page, next_section.start_page - 1)
break
return sections
def _generate_processing_recommendations(self, sections: List[DocumentSection],
content_analysis: Dict) -> Dict[str, Any]:
"""Generate intelligent processing recommendations"""
recommendations = {
"document_length_category": "short", # short, medium, long
"suggested_processing_approach": "full_document", # full_document, section_by_section
"high_priority_sections": [],
"ocr_recommended_sections": [],
"image_analysis_sections": [],
"table_extraction_sections": [],
"estimated_total_time": 0
}
total_pages = max(s.end_page for s in sections) if sections else 0
# Categorize document length
if total_pages <= 10:
recommendations["document_length_category"] = "short"
recommendations["suggested_processing_approach"] = "full_document"
elif total_pages <= 50:
recommendations["document_length_category"] = "medium"
recommendations["suggested_processing_approach"] = "section_by_section"
else:
recommendations["document_length_category"] = "long"
recommendations["suggested_processing_approach"] = "section_by_section"
# Analyze each section for recommendations
total_estimated_time = 0
for section in sections:
section_data = content_analysis["sections"].get(section.id, {})
# High priority sections (first few sections, or sections with important titles)
if (section.start_page <= 5 or
any(keyword in section.title.lower() for keyword in
['abstract', 'summary', 'introduction', 'conclusion', 'executive'])):
recommendations["high_priority_sections"].append(section.id)
# OCR recommendations
if section_data.get("images", 0) > 0 or section_data.get("has_complex_formatting", False):
recommendations["ocr_recommended_sections"].append(section.id)
# Image analysis recommendations
if section_data.get("images", 0) > 2: # Sections with multiple images
recommendations["image_analysis_sections"].append(section.id)
# Table extraction recommendations
if section_data.get("tables", 0) > 0:
recommendations["table_extraction_sections"].append(section.id)
total_estimated_time += section_data.get("estimated_processing_time", 0)
recommendations["estimated_total_time"] = total_estimated_time
return recommendations
def create_document_outline_hierarchy_artefact(file_id: str, pdf_bytes: bytes,
tika_json: Dict = None,
docling_json: Dict = None) -> Dict[str, Any]:
"""
Create a comprehensive document outline hierarchy artefact
Args:
file_id: File ID
pdf_bytes: PDF file content
tika_json: Optional Tika analysis results
docling_json: Optional Docling analysis results
Returns:
Document outline hierarchy artefact
"""
analyzer = DocumentAnalyzer()
analysis = analyzer.analyze_document_structure(pdf_bytes, tika_json, docling_json)
# Add file metadata
analysis["file_id"] = file_id
analysis["artefact_id"] = str(uuid.uuid4())
analysis["artefact_type"] = "document_outline_hierarchy"
return analysis