545 lines
21 KiB
Python
545 lines
21 KiB
Python
"""
|
|
Enhanced Document Analysis Module
|
|
|
|
This module provides comprehensive document structure analysis beyond basic split maps,
|
|
including multi-level hierarchies, numbering system detection, and content type analysis.
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import uuid
|
|
import datetime
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
import fitz # PyMuPDF
|
|
from modules.logger_tool import initialise_logger
|
|
import os
|
|
|
|
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
|
|
|
|
@dataclass
|
|
class DocumentSection:
|
|
"""Represents a document section at any hierarchy level"""
|
|
id: str
|
|
title: str
|
|
level: int
|
|
start_page: int
|
|
end_page: int
|
|
numbering: Optional[str] = None
|
|
parent_id: Optional[str] = None
|
|
children: List[str] = None
|
|
content_types: Dict[str, int] = None
|
|
confidence: float = 0.8
|
|
|
|
def __post_init__(self):
|
|
if self.children is None:
|
|
self.children = []
|
|
if self.content_types is None:
|
|
self.content_types = {}
|
|
|
|
@dataclass
|
|
class NumberingSystem:
|
|
"""Represents a detected numbering or coding system"""
|
|
system_id: str
|
|
pattern: str
|
|
description: str
|
|
examples: List[str]
|
|
applies_to_levels: List[int]
|
|
confidence: float
|
|
|
|
@dataclass
|
|
class ContentBlock:
|
|
"""Represents a content block with type information"""
|
|
block_id: str
|
|
page: int
|
|
bbox: Tuple[float, float, float, float] # x0, y0, x1, y1
|
|
content_type: str # 'text', 'image', 'table', 'formula', 'diagram'
|
|
text_content: Optional[str] = None
|
|
metadata: Dict[str, Any] = None
|
|
|
|
class DocumentAnalyzer:
|
|
"""Enhanced document analyzer for comprehensive structure detection"""
|
|
|
|
def __init__(self):
|
|
self.numbering_patterns = [
|
|
# Roman numerals
|
|
(r'^([IVX]+)\.?\s+(.+)', 'roman_numerals', 'Roman numeral chapters (I, II, III, ...)'),
|
|
# Decimal numbering
|
|
(r'^(\d+(?:\.\d+)*)\.?\s+(.+)', 'decimal_numbering', 'Decimal numbering (1.1, 1.2.1, ...)'),
|
|
# Letter numbering
|
|
(r'^([A-Z])\.?\s+(.+)', 'letter_chapters', 'Letter chapters (A, B, C, ...)'),
|
|
(r'^([a-z])\.?\s+(.+)', 'letter_sections', 'Letter sections (a, b, c, ...)'),
|
|
# Bracketed numbering
|
|
(r'^\((\d+)\)\s+(.+)', 'bracketed_numbers', 'Bracketed numbers ((1), (2), ...)'),
|
|
# Legal numbering
|
|
(r'^§\s*(\d+(?:\.\d+)*)\s+(.+)', 'legal_sections', 'Legal sections (§1, §1.1, ...)'),
|
|
# Article numbering
|
|
(r'^(?:Article|Art\.?)\s+(\d+(?:\.\d+)*)\s+(.+)', 'articles', 'Article numbering'),
|
|
]
|
|
|
|
def analyze_document_structure(self, pdf_bytes: bytes, tika_json: Dict = None,
|
|
docling_json: Dict = None) -> Dict[str, Any]:
|
|
"""
|
|
Fast, header-only structure analysis.
|
|
- Prefer PDF outline/bookmarks
|
|
- Otherwise, use Docling heading roles from existing artefact JSON
|
|
- No full-text scans; no per-page content analysis
|
|
"""
|
|
logger.info("Starting FAST document structure analysis (headings only)")
|
|
|
|
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
|
|
page_count = len(doc)
|
|
|
|
# Try PDF outline first
|
|
sections: List[DocumentSection] = self._extract_from_pdf_outline(doc)
|
|
|
|
# Fallback to Docling headings if outline inadequate
|
|
if (not sections) and docling_json:
|
|
sections = self._extract_from_docling(docling_json)
|
|
|
|
# Final fallback: coarse windows
|
|
if not sections:
|
|
sections = []
|
|
step = max(10, min(30, page_count // 5 or 1))
|
|
i = 1
|
|
idx = 1
|
|
while i <= page_count:
|
|
end = min(page_count, i + step - 1)
|
|
sections.append(DocumentSection(
|
|
id=f"sec{idx:02d}",
|
|
title=f"Pages {i}-{end}",
|
|
level=1,
|
|
start_page=i,
|
|
end_page=end,
|
|
confidence=0.2
|
|
))
|
|
i = end + 1
|
|
idx += 1
|
|
|
|
# Build hierarchy relationships and adjust parent end-pages using sibling boundaries
|
|
sections = self._build_section_hierarchy(sections)
|
|
# Normalize and finalize sections (clamp, front matter, last-page coverage)
|
|
sections = self._normalize_and_cover(sections, page_count)
|
|
|
|
doc.close()
|
|
|
|
return {
|
|
"version": 2,
|
|
"analysis_timestamp": datetime.datetime.utcnow().isoformat() + "Z",
|
|
"page_count": page_count,
|
|
"sections": [asdict(section) for section in sections],
|
|
"metadata": {
|
|
"analyzer_version": "2.1-fast",
|
|
"analysis_methods": ["pdf_outline", "docling_headings"],
|
|
}
|
|
}
|
|
|
|
def _extract_hierarchical_structure(self, doc: fitz.Document, tika_json: Dict = None,
|
|
docling_json: Dict = None) -> List[DocumentSection]:
|
|
# Kept for backward compat; delegate to outline + docling only
|
|
sections = self._extract_from_pdf_outline(doc)
|
|
if (not sections) and docling_json:
|
|
sections = self._extract_from_docling(docling_json)
|
|
return sections
|
|
|
|
def _extract_from_pdf_outline(self, doc: fitz.Document) -> List[DocumentSection]:
|
|
"""Extract sections from PDF outline/bookmarks"""
|
|
sections = []
|
|
toc = doc.get_toc(simple=False)
|
|
|
|
for i, (level, title, page, dest) in enumerate(toc):
|
|
if page < 1:
|
|
continue
|
|
|
|
section_id = f"outline_{i:03d}"
|
|
|
|
# Calculate end page (next section's start - 1, or last page)
|
|
end_page = page
|
|
for j in range(i + 1, len(toc)):
|
|
if toc[j][2] > 0: # Valid page number
|
|
end_page = toc[j][2] - 1
|
|
break
|
|
else:
|
|
end_page = len(doc)
|
|
|
|
section = DocumentSection(
|
|
id=section_id,
|
|
title=title.strip(),
|
|
level=level,
|
|
start_page=page,
|
|
end_page=end_page,
|
|
confidence=0.95
|
|
)
|
|
sections.append(section)
|
|
|
|
return sections
|
|
|
|
def _extract_from_docling(self, docling_json: Dict) -> List[DocumentSection]:
|
|
"""Extract sections from Docling analysis"""
|
|
sections: List[DocumentSection] = []
|
|
blocks = docling_json.get("blocks", []) or docling_json.get("elements", [])
|
|
|
|
# 1) Collect headings with page and level, preserving order
|
|
heading_items: List[Tuple[int, str, int, str]] = [] # (page, level, order_index, title)
|
|
order_index = 0
|
|
for block in blocks:
|
|
role_raw = (block.get("role") or block.get("type") or "").lower()
|
|
if not ("heading" in role_raw or role_raw in ("h1", "h2", "h3", "h4", "h5", "h6", "title")):
|
|
continue
|
|
text = (block.get("text") or block.get("content") or "").strip()
|
|
if not text:
|
|
continue
|
|
page_val = block.get("page", None)
|
|
if page_val is None:
|
|
page_val = block.get("page_no", None)
|
|
if page_val is None:
|
|
page_val = block.get("pageIndex", None)
|
|
try:
|
|
page_int = int(page_val) if page_val is not None else 1
|
|
except Exception:
|
|
page_int = 1
|
|
# Normalize to 1-based
|
|
page_int = page_int + 1 if page_int == 0 else page_int
|
|
|
|
# Determine heading level
|
|
level = 1
|
|
if "1" in role_raw or role_raw == "title":
|
|
level = 1
|
|
elif "2" in role_raw:
|
|
level = 2
|
|
elif "3" in role_raw:
|
|
level = 3
|
|
elif "4" in role_raw:
|
|
level = 4
|
|
elif "5" in role_raw:
|
|
level = 5
|
|
elif "6" in role_raw:
|
|
level = 6
|
|
|
|
heading_items.append((page_int, level, order_index, text))
|
|
order_index += 1
|
|
|
|
if not heading_items:
|
|
return sections
|
|
|
|
# 2) Sort by page then order_index to preserve within-page order
|
|
heading_items.sort(key=lambda h: (h[0], h[2]))
|
|
|
|
# 3) Build sections with hierarchical end-page computation
|
|
stack: List[DocumentSection] = []
|
|
idx_counter = 0
|
|
|
|
def close_until(level_threshold: int, next_page: int):
|
|
nonlocal sections, stack
|
|
while stack and stack[-1].level >= level_threshold:
|
|
cur = stack.pop()
|
|
# If next heading on same page, close at same page; else previous page
|
|
if next_page <= cur.start_page:
|
|
cur.end_page = cur.start_page
|
|
else:
|
|
cur.end_page = next_page - 1
|
|
if cur.end_page < cur.start_page:
|
|
cur.end_page = cur.start_page
|
|
sections.append(cur)
|
|
|
|
for page_int, level, _, text in heading_items:
|
|
# Close siblings and deeper levels
|
|
close_until(level, page_int)
|
|
# Open new heading
|
|
section_id = f"docling_{idx_counter:03d}"
|
|
idx_counter += 1
|
|
new_sec = DocumentSection(
|
|
id=section_id,
|
|
title=text,
|
|
level=level,
|
|
start_page=page_int,
|
|
end_page=page_int, # temporary; will finalize when closing
|
|
confidence=0.8
|
|
)
|
|
stack.append(new_sec)
|
|
|
|
# Close any remaining open sections at document end later in normalization
|
|
while stack:
|
|
cur = stack.pop()
|
|
sections.append(cur)
|
|
|
|
return sections
|
|
|
|
def _normalize_and_cover(self, sections: List[DocumentSection], page_count: int) -> List[DocumentSection]:
|
|
"""Harden outline sections while preserving hierarchy:
|
|
- clamp each section to [1, page_count]
|
|
- fix inverted ranges (but DO NOT remove hierarchical overlaps)
|
|
- ensure coverage from page 1 with synthetic front-matter if needed
|
|
- ensure last top-level section extends to page_count
|
|
- compute numbering hint if missing
|
|
"""
|
|
if not sections:
|
|
return sections
|
|
|
|
# Clamp values per section; do not modify overlap relationships
|
|
for s in sections:
|
|
s.start_page = max(1, min(s.start_page or 1, page_count))
|
|
s.end_page = max(1, min(s.end_page or s.start_page, page_count))
|
|
if s.end_page < s.start_page:
|
|
s.end_page = s.start_page
|
|
|
|
# Maintain original order (as produced by extractor and hierarchy builder)
|
|
|
|
# Insert synthetic front matter if needed
|
|
if sections and sections[0].start_page > 1:
|
|
# Generate a unique synthetic id that won't collide with existing ids
|
|
existing_ids = {s.id for s in sections}
|
|
base_id = "outline_front_matter"
|
|
syn_id = base_id
|
|
idx = 1
|
|
while syn_id in existing_ids:
|
|
syn_id = f"{base_id}_{idx}"
|
|
idx += 1
|
|
front = DocumentSection(
|
|
id=syn_id,
|
|
title="Front matter",
|
|
level=1,
|
|
start_page=1,
|
|
end_page=sections[0].start_page - 1,
|
|
confidence=0.6
|
|
)
|
|
sections.insert(0, front)
|
|
|
|
# Ensure last top-level section covers to page_count
|
|
top_levels = [s for s in sections if s.parent_id is None]
|
|
if top_levels:
|
|
last_top = top_levels[-1]
|
|
if last_top.end_page < page_count:
|
|
last_top.end_page = page_count
|
|
|
|
# Light numbering extraction based on heading text prefix
|
|
for s in sections:
|
|
m = re.match(r"^\s*([A-Za-z]+|[IVXLCM]+|\d+(?:\.\d+)*)\.?\s+", s.title)
|
|
if m:
|
|
s.numbering = s.numbering or m.group(1)
|
|
|
|
return sections
|
|
|
|
def _extract_from_text_patterns(self, doc: fitz.Document) -> List[DocumentSection]:
|
|
# Disabled in fast mode
|
|
return []
|
|
|
|
def _estimate_section_level(self, numbering: str, system_type: str) -> int:
|
|
"""Estimate section level based on numbering pattern"""
|
|
if system_type == 'roman_numerals':
|
|
return 1 # Typically chapter level
|
|
elif system_type == 'decimal_numbering':
|
|
dots = numbering.count('.')
|
|
return min(dots + 1, 6) # 1.1.1 = level 3
|
|
elif system_type == 'letter_chapters':
|
|
return 1
|
|
elif system_type == 'letter_sections':
|
|
return 2
|
|
else:
|
|
return 2 # Default
|
|
|
|
def _detect_numbering_systems(self, sections: List[DocumentSection]) -> List[NumberingSystem]:
|
|
"""Detect numbering systems used in the document"""
|
|
systems = []
|
|
|
|
# Group sections by their numbering patterns
|
|
pattern_groups = {}
|
|
for section in sections:
|
|
if section.numbering:
|
|
for pattern, system_type, description in self.numbering_patterns:
|
|
if re.match(pattern.replace(r'^(.+)', section.numbering), section.numbering):
|
|
if system_type not in pattern_groups:
|
|
pattern_groups[system_type] = {
|
|
'pattern': pattern,
|
|
'description': description,
|
|
'examples': [],
|
|
'levels': set(),
|
|
'count': 0
|
|
}
|
|
pattern_groups[system_type]['examples'].append(section.numbering)
|
|
pattern_groups[system_type]['levels'].add(section.level)
|
|
pattern_groups[system_type]['count'] += 1
|
|
break
|
|
|
|
# Create NumberingSystem objects
|
|
for system_type, data in pattern_groups.items():
|
|
if data['count'] >= 2: # At least 2 examples to be confident
|
|
system = NumberingSystem(
|
|
system_id=system_type,
|
|
pattern=data['pattern'],
|
|
description=data['description'],
|
|
examples=data['examples'][:5], # First 5 examples
|
|
applies_to_levels=list(data['levels']),
|
|
confidence=min(0.9, 0.5 + (data['count'] * 0.1))
|
|
)
|
|
systems.append(system)
|
|
|
|
return systems
|
|
|
|
def _analyze_content_types(self, doc: fitz.Document, sections: List[DocumentSection]) -> Dict[str, Any]:
|
|
# Disabled in fast mode
|
|
return {"total_blocks": 0, "content_types": {}, "sections": {}}
|
|
|
|
def _detect_tables_in_page(self, page) -> int:
|
|
return 0
|
|
|
|
def _has_complex_formatting(self, page) -> bool:
|
|
return False
|
|
|
|
def _merge_and_deduplicate_sections(self, sections: List[DocumentSection]) -> List[DocumentSection]:
|
|
"""Merge overlapping sections and remove duplicates"""
|
|
if not sections:
|
|
return []
|
|
|
|
# Sort by start page, then by level
|
|
sections.sort(key=lambda s: (s.start_page, s.level))
|
|
|
|
merged = []
|
|
for section in sections:
|
|
# Check if this section overlaps significantly with existing ones
|
|
is_duplicate = False
|
|
for existing in merged:
|
|
if (existing.start_page == section.start_page and
|
|
abs(existing.level - section.level) <= 1 and
|
|
self._text_similarity(existing.title, section.title) > 0.8):
|
|
# This is likely a duplicate, merge information
|
|
if section.confidence > existing.confidence:
|
|
existing.title = section.title
|
|
existing.numbering = section.numbering or existing.numbering
|
|
existing.confidence = section.confidence
|
|
is_duplicate = True
|
|
break
|
|
|
|
if not is_duplicate:
|
|
merged.append(section)
|
|
|
|
return merged
|
|
|
|
def _text_similarity(self, text1: str, text2: str) -> float:
|
|
"""Calculate text similarity (simple implementation)"""
|
|
if not text1 or not text2:
|
|
return 0.0
|
|
|
|
# Simple word-based similarity
|
|
words1 = set(text1.lower().split())
|
|
words2 = set(text2.lower().split())
|
|
|
|
if not words1 and not words2:
|
|
return 1.0
|
|
|
|
intersection = words1.intersection(words2)
|
|
union = words1.union(words2)
|
|
|
|
return len(intersection) / len(union) if union else 0.0
|
|
|
|
def _build_section_hierarchy(self, sections: List[DocumentSection]) -> List[DocumentSection]:
|
|
"""Build parent-child relationships between sections"""
|
|
if not sections:
|
|
return []
|
|
|
|
# Sort by start page and level
|
|
sections.sort(key=lambda s: (s.start_page, s.level))
|
|
|
|
# Build hierarchy
|
|
for i, section in enumerate(sections):
|
|
# Find parent (previous section with lower level)
|
|
for j in range(i - 1, -1, -1):
|
|
potential_parent = sections[j]
|
|
if (potential_parent.level < section.level and
|
|
potential_parent.start_page <= section.start_page and
|
|
potential_parent.end_page >= section.start_page):
|
|
section.parent_id = potential_parent.id
|
|
potential_parent.children.append(section.id)
|
|
break
|
|
|
|
# Update end pages based on children
|
|
for j in range(i + 1, len(sections)):
|
|
next_section = sections[j]
|
|
if (next_section.level <= section.level and
|
|
next_section.start_page > section.start_page):
|
|
section.end_page = min(section.end_page, next_section.start_page - 1)
|
|
break
|
|
|
|
return sections
|
|
|
|
def _generate_processing_recommendations(self, sections: List[DocumentSection],
|
|
content_analysis: Dict) -> Dict[str, Any]:
|
|
"""Generate intelligent processing recommendations"""
|
|
recommendations = {
|
|
"document_length_category": "short", # short, medium, long
|
|
"suggested_processing_approach": "full_document", # full_document, section_by_section
|
|
"high_priority_sections": [],
|
|
"ocr_recommended_sections": [],
|
|
"image_analysis_sections": [],
|
|
"table_extraction_sections": [],
|
|
"estimated_total_time": 0
|
|
}
|
|
|
|
total_pages = max(s.end_page for s in sections) if sections else 0
|
|
|
|
# Categorize document length
|
|
if total_pages <= 10:
|
|
recommendations["document_length_category"] = "short"
|
|
recommendations["suggested_processing_approach"] = "full_document"
|
|
elif total_pages <= 50:
|
|
recommendations["document_length_category"] = "medium"
|
|
recommendations["suggested_processing_approach"] = "section_by_section"
|
|
else:
|
|
recommendations["document_length_category"] = "long"
|
|
recommendations["suggested_processing_approach"] = "section_by_section"
|
|
|
|
# Analyze each section for recommendations
|
|
total_estimated_time = 0
|
|
for section in sections:
|
|
section_data = content_analysis["sections"].get(section.id, {})
|
|
|
|
# High priority sections (first few sections, or sections with important titles)
|
|
if (section.start_page <= 5 or
|
|
any(keyword in section.title.lower() for keyword in
|
|
['abstract', 'summary', 'introduction', 'conclusion', 'executive'])):
|
|
recommendations["high_priority_sections"].append(section.id)
|
|
|
|
# OCR recommendations
|
|
if section_data.get("images", 0) > 0 or section_data.get("has_complex_formatting", False):
|
|
recommendations["ocr_recommended_sections"].append(section.id)
|
|
|
|
# Image analysis recommendations
|
|
if section_data.get("images", 0) > 2: # Sections with multiple images
|
|
recommendations["image_analysis_sections"].append(section.id)
|
|
|
|
# Table extraction recommendations
|
|
if section_data.get("tables", 0) > 0:
|
|
recommendations["table_extraction_sections"].append(section.id)
|
|
|
|
total_estimated_time += section_data.get("estimated_processing_time", 0)
|
|
|
|
recommendations["estimated_total_time"] = total_estimated_time
|
|
|
|
return recommendations
|
|
|
|
def create_document_outline_hierarchy_artefact(file_id: str, pdf_bytes: bytes,
|
|
tika_json: Dict = None,
|
|
docling_json: Dict = None) -> Dict[str, Any]:
|
|
"""
|
|
Create a comprehensive document outline hierarchy artefact
|
|
|
|
Args:
|
|
file_id: File ID
|
|
pdf_bytes: PDF file content
|
|
tika_json: Optional Tika analysis results
|
|
docling_json: Optional Docling analysis results
|
|
|
|
Returns:
|
|
Document outline hierarchy artefact
|
|
"""
|
|
analyzer = DocumentAnalyzer()
|
|
analysis = analyzer.analyze_document_structure(pdf_bytes, tika_json, docling_json)
|
|
|
|
# Add file metadata
|
|
analysis["file_id"] = file_id
|
|
analysis["artefact_id"] = str(uuid.uuid4())
|
|
analysis["artefact_type"] = "document_outline_hierarchy"
|
|
|
|
return analysis
|