api/routers/database/files/split_map.py
2025-11-14 14:47:19 +00:00

589 lines
21 KiB
Python

# api/routers/database/files/split_map.py
"""
Automatic split_map.json generator for uploaded documents.
This module creates chapter/section boundaries for documents using existing artefacts
(Tika JSON, Docling frontmatter OCR) and optional PDF outline extraction.
Strategy (waterfall, stop at confidence ≥ 0.7):
1. PDF Outline/Bookmarks (best): confidence ≈ 0.95
2. Headings from Docling JSON: confidence ≈ 0.8
3. TOC from Tika text: confidence ≈ 0.7-0.8
4. Fixed windows: confidence ≈ 0.2
Hard constraints:
- For any fallback Docling "no-OCR" call: limit page_range to [1, min(30, page_count)]
- Never process more than 30 pages in one Docling request
- Use existing artefacts whenever possible
"""
import re
import json
import uuid
import datetime
import os
import requests
from typing import List, Dict, Any, Optional, Tuple
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
from modules.database.supabase.utils.storage import StorageAdmin
from modules.logger_tool import initialise_logger
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
# ---------- Utilities
def _now_iso():
"""Return current UTC timestamp in ISO format."""
return datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def _load_artefact_json(storage: StorageAdmin, bucket: str, rel_path: str) -> Optional[Dict[str, Any]]:
"""Load JSON artefact from storage."""
try:
raw = storage.download_file(bucket, rel_path)
return json.loads(raw.decode("utf-8"))
except Exception as e:
logger.debug(f"Failed to load artefact {rel_path}: {e}")
return None
def _page_count_from_tika(tika_json: Dict[str, Any]) -> Optional[int]:
"""Extract page count from Tika JSON metadata."""
for k in ("xmpTPg:NPages", "Page-Count", "pdf:PageCount", "pdf:pagecount"):
v = tika_json.get(k) or tika_json.get(k.lower())
try:
if v is not None:
return int(v)
except Exception:
pass
return None
# ---------- A) Outline via PyMuPDF (optional but recommended)
def _try_outline(pdf_bytes: bytes) -> Optional[List[Tuple[str, int]]]:
"""
Extract PDF outline/bookmarks using PyMuPDF.
Returns [(title, start_page)] for level-1 bookmarks only.
"""
try:
import fitz # PyMuPDF
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
toc = doc.get_toc(simple=True) # list of [level, title, page]
doc.close()
# Keep level-1 only, ensure valid pages
out = []
for level, title, page in toc:
if level == 1 and page >= 1:
clean_title = title.strip()
if clean_title and len(clean_title) > 1:
out.append((clean_title, page))
return out if len(out) >= 2 else None # Need at least 2 chapters
except ImportError:
logger.debug("PyMuPDF not available, skipping outline extraction")
return None
except Exception as e:
logger.debug(f"Outline extraction failed: {e}")
return None
# ---------- B) Headings from Docling JSON
def _try_headings(docling_json: Dict[str, Any]) -> Optional[List[Tuple[str, int, int]]]:
"""
Extract headings from Docling JSON.
Returns [(title, start_page, level)] — we only return starts; end pages are computed later.
"""
if not docling_json:
return None
# Handle different Docling JSON structures
blocks = (docling_json.get("blocks") or
docling_json.get("elements") or
docling_json.get("body", {}).get("blocks") or [])
candidates: List[Tuple[str, int, int]] = []
for b in blocks:
# Check if this is a heading block
role = (b.get("role") or b.get("type") or "").lower()
if not ("heading" in role or role in ("h1", "h2", "title", "section-header")):
continue
# Extract text content
text = (b.get("text") or b.get("content") or "").strip()
if not text or len(text) < 3:
continue
# Extract page number with robust handling of 0-based pageIndex
p = None
if b.get("pageIndex") is not None:
try:
p = int(b.get("pageIndex")) + 1
except Exception:
p = None
if p is None:
for key in ("page", "page_no", "page_number"):
if b.get(key) is not None:
try:
p = int(b.get(key))
except Exception:
p = None
break
if p is None or p < 1:
continue
# Determine heading level
level = 1 # default
if "1" in role or "h1" in role:
level = 1
elif "2" in role or "h2" in role:
level = 2
# Chapter regex boosts to level 1
if re.match(r"^\s*(chapter|ch\.?|section|part)\s+\d+", text, re.I):
level = 1
candidates.append((text, p, level))
if not candidates:
return None
# Prefer level 1; if none, promote level 2 to level 1
l1 = [(t, p, l) for (t, p, l) in candidates if l == 1]
if not l1:
l1 = [(t, p, 1) for (t, p, _) in candidates]
# Sort by page and keep strictly increasing pages only
l1_sorted = []
seen = set()
for (t, p, l) in sorted(l1, key=lambda x: x[1]):
if p not in seen and p >= 1:
l1_sorted.append((t, p, l))
seen.add(p)
return l1_sorted if len(l1_sorted) >= 2 else None
def _try_headings_fallback(file_id: str, cabinet_id: str, bucket: str,
processing_bytes: bytes, processing_mime: str,
page_count: int) -> Optional[List[Tuple[str, int, int]]]:
"""
Make a limited Docling no-OCR call (max 30 pages) to extract headings.
This is used only when existing artefacts don't have sufficient heading data.
"""
try:
docling_url = os.getenv('DOCLING_URL') or os.getenv('NEOFS_DOCLING_URL')
if not docling_url:
logger.debug("No Docling URL configured for headings fallback")
return None
# Strictly limit to first 30 pages
max_pages = min(30, page_count)
logger.info(f"Headings fallback: limited Docling call for file_id={file_id}, pages=1-{max_pages}")
# Build Docling request (no-OCR, limited pages)
docling_api_key = os.getenv('DOCLING_API_KEY')
headers = {'Accept': 'application/json'}
if docling_api_key:
headers['X-Api-Key'] = docling_api_key
form_data = [
('target_type', 'inbody'),
('to_formats', 'json'),
('do_ocr', 'false'),
('force_ocr', 'false'),
('image_export_mode', 'embedded'),
('pdf_backend', 'dlparse_v4'),
('table_mode', 'fast'),
('page_range', '1'),
('page_range', str(max_pages))
]
files = [('files', ('file', processing_bytes, processing_mime))]
# Make the request with timeout
timeout = int(os.getenv('DOCLING_HEADINGS_TIMEOUT', '1800')) # 30 minutes default
resp = requests.post(
f"{docling_url.rstrip('/')}/v1/convert/file",
files=files,
data=form_data,
headers=headers,
timeout=timeout
)
resp.raise_for_status()
docling_json = resp.json()
logger.debug(f"Headings fallback: received Docling response for file_id={file_id}")
return _try_headings(docling_json)
except Exception as e:
logger.error(f"Headings fallback failed for file_id={file_id}: {e}")
return None
# ---------- C) TOC from Tika text (dot leaders & page num)
TOC_LINE = re.compile(r"^\s*(.+?)\s?(\.{2,}|\s{3,})\s*(\d{1,4})\s*$")
def _try_toc_text(tika_text: str) -> Optional[List[Tuple[str, int]]]:
"""
Parse TOC from Tika text using dot leaders and page numbers.
Returns [(title, start_page)] if successful.
"""
if not tika_text:
return None
# Heuristic: only scan first ~1500 lines (roughly first 15 pages)
head = "\n".join(tika_text.splitlines()[:1500])
pairs = []
for line in head.splitlines():
m = TOC_LINE.match(line)
if not m:
continue
title = m.group(1).strip()
try:
page = int(m.group(3))
except Exception:
continue
# Reject obvious junk
if len(title) < 3 or page < 1 or page > 9999:
continue
# Skip common false positives
if any(skip in title.lower() for skip in ['copyright', 'isbn', 'published', 'printed']):
continue
pairs.append((title, page))
# Require at least 5 entries and monotonic pages
if len(pairs) >= 5:
pages = [p for _, p in pairs]
if pages == sorted(pages):
logger.debug(f"TOC extraction found {len(pairs)} entries")
return pairs
return None
# ---------- Build entries with ends, apply smoothing
def _entries_from_starts(starts: List[Tuple[str, int, int]], page_count: int, source: str = "headings") -> List[Dict[str, Any]]:
"""
Build entries from start points with computed end pages.
starts: [(title, page, level)]
"""
entries = []
base_confidence = 0.8 if source == "headings" else 0.75
for i, (title, start, level) in enumerate(starts):
end = (starts[i + 1][1] - 1) if i + 1 < len(starts) else page_count
entries.append({
"id": f"sec{i + 1:02d}",
"title": title,
"level": level,
"start_page": int(start),
"end_page": int(end),
"source": source,
"confidence": base_confidence
})
# Merge tiny sections (< 3 pages) into previous
merged = []
for e in entries:
section_size = e["end_page"] - e["start_page"] + 1
if merged and section_size < 3:
# Merge into previous section
merged[-1]["end_page"] = e["end_page"]
merged[-1]["title"] += " / " + e["title"]
merged[-1]["confidence"] *= 0.95 # Slight confidence penalty for merging
else:
merged.append(e)
return merged
def _entries_from_pairs(pairs: List[Tuple[str, int]], page_count: int, source: str = "outline") -> List[Dict[str, Any]]:
"""
Build entries from (title, start_page) pairs.
"""
entries = []
base_confidence = 0.95 if source == "outline" else (0.8 if source == "toc" else 0.75)
for i, (title, start) in enumerate(pairs):
end = (pairs[i + 1][1] - 1) if i + 1 < len(pairs) else page_count
entries.append({
"id": f"sec{i + 1:02d}",
"title": title,
"level": 1,
"start_page": int(start),
"end_page": int(end),
"source": source,
"confidence": base_confidence
})
# Apply same merging logic for tiny sections
merged = []
for e in entries:
section_size = e["end_page"] - e["start_page"] + 1
if merged and section_size < 3:
merged[-1]["end_page"] = e["end_page"]
merged[-1]["title"] += " / " + e["title"]
merged[-1]["confidence"] *= 0.95
else:
merged.append(e)
return merged
# ---------- Post-processing normalization
def _normalize_entries(entries: List[Dict[str, Any]], page_count: int) -> List[Dict[str, Any]]:
"""Normalize entries to ensure:
- coverage from page 1
- 1 <= start_page <= end_page <= page_count
- strictly increasing, non-overlapping ranges
- fill initial gap with a synthetic front matter section if needed
"""
if not entries:
return entries
# Sanitize and sort by start_page
safe: List[Dict[str, Any]] = []
for e in entries:
try:
s = int(e.get("start_page", 1))
t = int(e.get("end_page", s))
except Exception:
continue
s = max(1, min(s, page_count))
t = max(1, min(t, page_count))
if t < s:
t = s
ne = dict(e)
ne["start_page"], ne["end_page"] = s, t
safe.append(ne)
safe.sort(key=lambda x: (x["start_page"], x.get("level", 1)))
# De-overlap by adjusting starts; ensure monotonic ranges
normalized: List[Dict[str, Any]] = []
for e in safe:
if not normalized:
normalized.append(e)
continue
prev = normalized[-1]
if e["start_page"] <= prev["end_page"]:
e["start_page"] = prev["end_page"] + 1
if e["start_page"] > page_count:
continue
if e["end_page"] < e["start_page"]:
e["end_page"] = e["start_page"]
e["end_page"] = min(e["end_page"], page_count)
normalized.append(e)
# Insert synthetic front matter if first start > 1
if normalized and normalized[0]["start_page"] > 1:
front = {
"id": "sec00",
"title": "Front matter",
"level": 1,
"start_page": 1,
"end_page": normalized[0]["start_page"] - 1,
"source": "synthetic",
"confidence": 0.6,
}
normalized.insert(0, front)
# Ensure last section ends at page_count
if normalized and normalized[-1]["end_page"] < page_count:
normalized[-1]["end_page"] = page_count
# Renumber ids sequentially
out: List[Dict[str, Any]] = []
for idx, e in enumerate(normalized, start=1):
ne = dict(e)
ne["id"] = f"sec{idx:02d}"
out.append(ne)
return out
# ---------- Main entry point
def create_split_map_for_file(file_id: str) -> Dict[str, Any]:
"""
Create split_map.json for a file using waterfall strategy:
1. PDF outline (best)
2. Docling headings (from existing or limited fallback)
3. Tika TOC parsing
4. Fixed windows (fallback)
"""
logger.info(f"Creating split_map for file_id={file_id}")
client = SupabaseServiceRoleClient()
storage = StorageAdmin()
# 1) Lookup file row & bucket
fr = client.supabase.table('files').select('id,bucket,cabinet_id,name,path,mime_type').eq('id', file_id).single().execute()
file_row = fr.data or {}
bucket = file_row.get('bucket')
cabinet_id = file_row.get('cabinet_id')
# 2) Find artefacts
arts = client.supabase.table('document_artefacts') \
.select('*').eq('file_id', file_id).order('created_at', desc=True).execute().data or []
def find_art(t):
for a in arts:
if a.get('type') == t:
return a
return None
a_pdf = find_art('document_pdf') # if converted to PDF
a_tika = find_art('tika_json')
a_noocr = find_art('docling_noocr_json')
a_fm = find_art('docling_frontmatter_json')
# 3) Load JSON/text
tika_json = _load_artefact_json(storage, bucket, a_tika['rel_path']) if a_tika else None
docling_noocr = _load_artefact_json(storage, bucket, a_noocr['rel_path']) if a_noocr else None
docling_fm = _load_artefact_json(storage, bucket, a_fm['rel_path']) if a_fm else None
# Get page count
page_count = _page_count_from_tika(tika_json or {}) or 100 # reasonable default
# Get PDF bytes for outline extraction
pdf_bytes = None
processing_bytes = None
processing_mime = None
if a_pdf:
# Use converted PDF
pdf_bytes = storage.download_file(bucket, a_pdf['rel_path'])
processing_bytes = pdf_bytes
processing_mime = 'application/pdf'
else:
# Check if original file is PDF
if file_row.get('mime_type') == 'application/pdf':
pdf_bytes = storage.download_file(bucket, file_row['path'])
processing_bytes = pdf_bytes
processing_mime = 'application/pdf'
# 4) Try methods in waterfall order
method = "fixed"
confidence = 0.2
entries: List[Dict[str, Any]] = []
# A) PDF Outline/Bookmarks (best)
if pdf_bytes and not entries:
logger.debug(f"Trying outline extraction for file_id={file_id}")
pairs = _try_outline(pdf_bytes)
if pairs:
entries = _entries_from_pairs(pairs, page_count, source="outline")
method, confidence = "outline", 0.95
logger.info(f"Split map: outline method found {len(entries)} sections")
# B) Headings from existing Docling JSON
if not entries:
logger.debug(f"Trying headings from existing Docling JSON for file_id={file_id}")
# Try no-OCR first, then frontmatter
for docling_json, source_name in [(docling_noocr, "noocr"), (docling_fm, "frontmatter")]:
if docling_json:
starts = _try_headings(docling_json)
if starts:
entries = _entries_from_starts(starts, page_count, source="headings")
method, confidence = "headings", 0.8
logger.info(f"Split map: headings method ({source_name}) found {len(entries)} sections")
break
# B2) Headings fallback with limited Docling call (if we have processing bytes)
if not entries and processing_bytes and processing_mime:
logger.debug(f"Trying headings fallback with limited Docling call for file_id={file_id}")
starts = _try_headings_fallback(file_id, cabinet_id, bucket, processing_bytes, processing_mime, page_count)
if starts:
entries = _entries_from_starts(starts, page_count, source="headings")
method, confidence = "headings", 0.75 # Slightly lower confidence for fallback
logger.info(f"Split map: headings fallback found {len(entries)} sections")
# C) TOC from Tika text
if not entries and tika_json:
logger.debug(f"Trying TOC extraction from Tika text for file_id={file_id}")
# Try common Tika text keys
text = tika_json.get("X-TIKA:content") or tika_json.get("content") or ""
pairs = _try_toc_text(text)
if pairs:
entries = _entries_from_pairs(pairs, page_count, source="toc")
method, confidence = "toc", 0.75
logger.info(f"Split map: TOC method found {len(entries)} sections")
# D) Fixed windows (fallback)
if not entries:
logger.info(f"Using fixed window fallback for file_id={file_id}")
step = max(10, min(20, page_count // 10)) # Adaptive step size
pairs = []
for i in range(1, page_count + 1, step):
end_page = min(i + step - 1, page_count)
title = f"Pages {i}-{end_page}" if i != end_page else f"Page {i}"
pairs.append((title, i))
entries = _entries_from_pairs(pairs, page_count, source="fixed")
method, confidence = "fixed", 0.2
logger.info(f"Split map: fixed method created {len(entries)} sections")
# 5) Normalize entries and build split_map.json
entries = _normalize_entries(entries, page_count)
split_map = {
"version": 1,
"file_id": file_id,
"source_pdf_artefact_id": a_pdf['id'] if a_pdf else None,
"sources": {
"docling_noocr_json": a_noocr['id'] if a_noocr else None,
"docling_frontmatter_json": a_fm['id'] if a_fm else None,
"tika_json": a_tika['id'] if a_tika else None
},
"method": method,
"confidence": confidence,
"page_count": page_count,
"entries": entries,
"created_at": _now_iso(),
"notes": f"auto-generated using {method} method; user can edit in Split Marker UI"
}
# 6) Store as artefact
artefact_id = str(uuid.uuid4())
rel_path = f"{cabinet_id}/{file_id}/{artefact_id}/split_map.json"
storage.upload_file(
bucket,
rel_path,
json.dumps(split_map, ensure_ascii=False, indent=2).encode("utf-8"),
"application/json",
upsert=True
)
# Enhanced metadata for UI display
enhanced_extra = {
"method": method,
"confidence": confidence,
"entries_count": len(entries),
"display_name": "Document Structure Map",
"bundle_label": "Split Map",
"section_title": "Document Structure Map",
"page_count": page_count,
"bundle_type": "split_map_json",
"processing_mode": "document_analysis",
"pipeline": "structure_analysis",
"is_structure_map": True,
"ui_category": "document_analysis",
"ui_order": 2,
"description": f"Document section boundaries identified using {method} method with {confidence:.1%} confidence ({len(entries)} sections)",
"viewer_type": "json"
}
client.supabase.table('document_artefacts').insert({
"id": artefact_id,
"file_id": file_id,
"type": "split_map_json",
"rel_path": rel_path,
"extra": enhanced_extra,
"status": "completed"
}).execute()
logger.info(f"Split map stored: file_id={file_id}, method={method}, confidence={confidence:.2f}, entries={len(entries)}")
return split_map