# api/routers/database/files/split_map.py """ Automatic split_map.json generator for uploaded documents. This module creates chapter/section boundaries for documents using existing artefacts (Tika JSON, Docling frontmatter OCR) and optional PDF outline extraction. Strategy (waterfall, stop at confidence ≥ 0.7): 1. PDF Outline/Bookmarks (best): confidence ≈ 0.95 2. Headings from Docling JSON: confidence ≈ 0.8 3. TOC from Tika text: confidence ≈ 0.7-0.8 4. Fixed windows: confidence ≈ 0.2 Hard constraints: - For any fallback Docling "no-OCR" call: limit page_range to [1, min(30, page_count)] - Never process more than 30 pages in one Docling request - Use existing artefacts whenever possible """ import re import json import uuid import datetime import os import requests from typing import List, Dict, Any, Optional, Tuple from modules.database.supabase.utils.client import SupabaseServiceRoleClient from modules.database.supabase.utils.storage import StorageAdmin from modules.logger_tool import initialise_logger logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True) # ---------- Utilities def _now_iso(): """Return current UTC timestamp in ISO format.""" return datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z" def _load_artefact_json(storage: StorageAdmin, bucket: str, rel_path: str) -> Optional[Dict[str, Any]]: """Load JSON artefact from storage.""" try: raw = storage.download_file(bucket, rel_path) return json.loads(raw.decode("utf-8")) except Exception as e: logger.debug(f"Failed to load artefact {rel_path}: {e}") return None def _page_count_from_tika(tika_json: Dict[str, Any]) -> Optional[int]: """Extract page count from Tika JSON metadata.""" for k in ("xmpTPg:NPages", "Page-Count", "pdf:PageCount", "pdf:pagecount"): v = tika_json.get(k) or tika_json.get(k.lower()) try: if v is not None: return int(v) except Exception: pass return None # ---------- A) Outline via PyMuPDF (optional but recommended) def _try_outline(pdf_bytes: bytes) -> Optional[List[Tuple[str, int]]]: """ Extract PDF outline/bookmarks using PyMuPDF. Returns [(title, start_page)] for level-1 bookmarks only. """ try: import fitz # PyMuPDF doc = fitz.open(stream=pdf_bytes, filetype="pdf") toc = doc.get_toc(simple=True) # list of [level, title, page] doc.close() # Keep level-1 only, ensure valid pages out = [] for level, title, page in toc: if level == 1 and page >= 1: clean_title = title.strip() if clean_title and len(clean_title) > 1: out.append((clean_title, page)) return out if len(out) >= 2 else None # Need at least 2 chapters except ImportError: logger.debug("PyMuPDF not available, skipping outline extraction") return None except Exception as e: logger.debug(f"Outline extraction failed: {e}") return None # ---------- B) Headings from Docling JSON def _try_headings(docling_json: Dict[str, Any]) -> Optional[List[Tuple[str, int, int]]]: """ Extract headings from Docling JSON. Returns [(title, start_page, level)] — we only return starts; end pages are computed later. """ if not docling_json: return None # Handle different Docling JSON structures blocks = (docling_json.get("blocks") or docling_json.get("elements") or docling_json.get("body", {}).get("blocks") or []) candidates: List[Tuple[str, int, int]] = [] for b in blocks: # Check if this is a heading block role = (b.get("role") or b.get("type") or "").lower() if not ("heading" in role or role in ("h1", "h2", "title", "section-header")): continue # Extract text content text = (b.get("text") or b.get("content") or "").strip() if not text or len(text) < 3: continue # Extract page number with robust handling of 0-based pageIndex p = None if b.get("pageIndex") is not None: try: p = int(b.get("pageIndex")) + 1 except Exception: p = None if p is None: for key in ("page", "page_no", "page_number"): if b.get(key) is not None: try: p = int(b.get(key)) except Exception: p = None break if p is None or p < 1: continue # Determine heading level level = 1 # default if "1" in role or "h1" in role: level = 1 elif "2" in role or "h2" in role: level = 2 # Chapter regex boosts to level 1 if re.match(r"^\s*(chapter|ch\.?|section|part)\s+\d+", text, re.I): level = 1 candidates.append((text, p, level)) if not candidates: return None # Prefer level 1; if none, promote level 2 to level 1 l1 = [(t, p, l) for (t, p, l) in candidates if l == 1] if not l1: l1 = [(t, p, 1) for (t, p, _) in candidates] # Sort by page and keep strictly increasing pages only l1_sorted = [] seen = set() for (t, p, l) in sorted(l1, key=lambda x: x[1]): if p not in seen and p >= 1: l1_sorted.append((t, p, l)) seen.add(p) return l1_sorted if len(l1_sorted) >= 2 else None def _try_headings_fallback(file_id: str, cabinet_id: str, bucket: str, processing_bytes: bytes, processing_mime: str, page_count: int) -> Optional[List[Tuple[str, int, int]]]: """ Make a limited Docling no-OCR call (max 30 pages) to extract headings. This is used only when existing artefacts don't have sufficient heading data. """ try: docling_url = os.getenv('DOCLING_URL') or os.getenv('NEOFS_DOCLING_URL') if not docling_url: logger.debug("No Docling URL configured for headings fallback") return None # Strictly limit to first 30 pages max_pages = min(30, page_count) logger.info(f"Headings fallback: limited Docling call for file_id={file_id}, pages=1-{max_pages}") # Build Docling request (no-OCR, limited pages) docling_api_key = os.getenv('DOCLING_API_KEY') headers = {'Accept': 'application/json'} if docling_api_key: headers['X-Api-Key'] = docling_api_key form_data = [ ('target_type', 'inbody'), ('to_formats', 'json'), ('do_ocr', 'false'), ('force_ocr', 'false'), ('image_export_mode', 'embedded'), ('pdf_backend', 'dlparse_v4'), ('table_mode', 'fast'), ('page_range', '1'), ('page_range', str(max_pages)) ] files = [('files', ('file', processing_bytes, processing_mime))] # Make the request with timeout timeout = int(os.getenv('DOCLING_HEADINGS_TIMEOUT', '1800')) # 30 minutes default resp = requests.post( f"{docling_url.rstrip('/')}/v1/convert/file", files=files, data=form_data, headers=headers, timeout=timeout ) resp.raise_for_status() docling_json = resp.json() logger.debug(f"Headings fallback: received Docling response for file_id={file_id}") return _try_headings(docling_json) except Exception as e: logger.error(f"Headings fallback failed for file_id={file_id}: {e}") return None # ---------- C) TOC from Tika text (dot leaders & page num) TOC_LINE = re.compile(r"^\s*(.+?)\s?(\.{2,}|\s{3,})\s*(\d{1,4})\s*$") def _try_toc_text(tika_text: str) -> Optional[List[Tuple[str, int]]]: """ Parse TOC from Tika text using dot leaders and page numbers. Returns [(title, start_page)] if successful. """ if not tika_text: return None # Heuristic: only scan first ~1500 lines (roughly first 15 pages) head = "\n".join(tika_text.splitlines()[:1500]) pairs = [] for line in head.splitlines(): m = TOC_LINE.match(line) if not m: continue title = m.group(1).strip() try: page = int(m.group(3)) except Exception: continue # Reject obvious junk if len(title) < 3 or page < 1 or page > 9999: continue # Skip common false positives if any(skip in title.lower() for skip in ['copyright', 'isbn', 'published', 'printed']): continue pairs.append((title, page)) # Require at least 5 entries and monotonic pages if len(pairs) >= 5: pages = [p for _, p in pairs] if pages == sorted(pages): logger.debug(f"TOC extraction found {len(pairs)} entries") return pairs return None # ---------- Build entries with ends, apply smoothing def _entries_from_starts(starts: List[Tuple[str, int, int]], page_count: int, source: str = "headings") -> List[Dict[str, Any]]: """ Build entries from start points with computed end pages. starts: [(title, page, level)] """ entries = [] base_confidence = 0.8 if source == "headings" else 0.75 for i, (title, start, level) in enumerate(starts): end = (starts[i + 1][1] - 1) if i + 1 < len(starts) else page_count entries.append({ "id": f"sec{i + 1:02d}", "title": title, "level": level, "start_page": int(start), "end_page": int(end), "source": source, "confidence": base_confidence }) # Merge tiny sections (< 3 pages) into previous merged = [] for e in entries: section_size = e["end_page"] - e["start_page"] + 1 if merged and section_size < 3: # Merge into previous section merged[-1]["end_page"] = e["end_page"] merged[-1]["title"] += " / " + e["title"] merged[-1]["confidence"] *= 0.95 # Slight confidence penalty for merging else: merged.append(e) return merged def _entries_from_pairs(pairs: List[Tuple[str, int]], page_count: int, source: str = "outline") -> List[Dict[str, Any]]: """ Build entries from (title, start_page) pairs. """ entries = [] base_confidence = 0.95 if source == "outline" else (0.8 if source == "toc" else 0.75) for i, (title, start) in enumerate(pairs): end = (pairs[i + 1][1] - 1) if i + 1 < len(pairs) else page_count entries.append({ "id": f"sec{i + 1:02d}", "title": title, "level": 1, "start_page": int(start), "end_page": int(end), "source": source, "confidence": base_confidence }) # Apply same merging logic for tiny sections merged = [] for e in entries: section_size = e["end_page"] - e["start_page"] + 1 if merged and section_size < 3: merged[-1]["end_page"] = e["end_page"] merged[-1]["title"] += " / " + e["title"] merged[-1]["confidence"] *= 0.95 else: merged.append(e) return merged # ---------- Post-processing normalization def _normalize_entries(entries: List[Dict[str, Any]], page_count: int) -> List[Dict[str, Any]]: """Normalize entries to ensure: - coverage from page 1 - 1 <= start_page <= end_page <= page_count - strictly increasing, non-overlapping ranges - fill initial gap with a synthetic front matter section if needed """ if not entries: return entries # Sanitize and sort by start_page safe: List[Dict[str, Any]] = [] for e in entries: try: s = int(e.get("start_page", 1)) t = int(e.get("end_page", s)) except Exception: continue s = max(1, min(s, page_count)) t = max(1, min(t, page_count)) if t < s: t = s ne = dict(e) ne["start_page"], ne["end_page"] = s, t safe.append(ne) safe.sort(key=lambda x: (x["start_page"], x.get("level", 1))) # De-overlap by adjusting starts; ensure monotonic ranges normalized: List[Dict[str, Any]] = [] for e in safe: if not normalized: normalized.append(e) continue prev = normalized[-1] if e["start_page"] <= prev["end_page"]: e["start_page"] = prev["end_page"] + 1 if e["start_page"] > page_count: continue if e["end_page"] < e["start_page"]: e["end_page"] = e["start_page"] e["end_page"] = min(e["end_page"], page_count) normalized.append(e) # Insert synthetic front matter if first start > 1 if normalized and normalized[0]["start_page"] > 1: front = { "id": "sec00", "title": "Front matter", "level": 1, "start_page": 1, "end_page": normalized[0]["start_page"] - 1, "source": "synthetic", "confidence": 0.6, } normalized.insert(0, front) # Ensure last section ends at page_count if normalized and normalized[-1]["end_page"] < page_count: normalized[-1]["end_page"] = page_count # Renumber ids sequentially out: List[Dict[str, Any]] = [] for idx, e in enumerate(normalized, start=1): ne = dict(e) ne["id"] = f"sec{idx:02d}" out.append(ne) return out # ---------- Main entry point def create_split_map_for_file(file_id: str) -> Dict[str, Any]: """ Create split_map.json for a file using waterfall strategy: 1. PDF outline (best) 2. Docling headings (from existing or limited fallback) 3. Tika TOC parsing 4. Fixed windows (fallback) """ logger.info(f"Creating split_map for file_id={file_id}") client = SupabaseServiceRoleClient() storage = StorageAdmin() # 1) Lookup file row & bucket fr = client.supabase.table('files').select('id,bucket,cabinet_id,name,path,mime_type').eq('id', file_id).single().execute() file_row = fr.data or {} bucket = file_row.get('bucket') cabinet_id = file_row.get('cabinet_id') # 2) Find artefacts arts = client.supabase.table('document_artefacts') \ .select('*').eq('file_id', file_id).order('created_at', desc=True).execute().data or [] def find_art(t): for a in arts: if a.get('type') == t: return a return None a_pdf = find_art('document_pdf') # if converted to PDF a_tika = find_art('tika_json') a_noocr = find_art('docling_noocr_json') a_fm = find_art('docling_frontmatter_json') # 3) Load JSON/text tika_json = _load_artefact_json(storage, bucket, a_tika['rel_path']) if a_tika else None docling_noocr = _load_artefact_json(storage, bucket, a_noocr['rel_path']) if a_noocr else None docling_fm = _load_artefact_json(storage, bucket, a_fm['rel_path']) if a_fm else None # Get page count page_count = _page_count_from_tika(tika_json or {}) or 100 # reasonable default # Get PDF bytes for outline extraction pdf_bytes = None processing_bytes = None processing_mime = None if a_pdf: # Use converted PDF pdf_bytes = storage.download_file(bucket, a_pdf['rel_path']) processing_bytes = pdf_bytes processing_mime = 'application/pdf' else: # Check if original file is PDF if file_row.get('mime_type') == 'application/pdf': pdf_bytes = storage.download_file(bucket, file_row['path']) processing_bytes = pdf_bytes processing_mime = 'application/pdf' # 4) Try methods in waterfall order method = "fixed" confidence = 0.2 entries: List[Dict[str, Any]] = [] # A) PDF Outline/Bookmarks (best) if pdf_bytes and not entries: logger.debug(f"Trying outline extraction for file_id={file_id}") pairs = _try_outline(pdf_bytes) if pairs: entries = _entries_from_pairs(pairs, page_count, source="outline") method, confidence = "outline", 0.95 logger.info(f"Split map: outline method found {len(entries)} sections") # B) Headings from existing Docling JSON if not entries: logger.debug(f"Trying headings from existing Docling JSON for file_id={file_id}") # Try no-OCR first, then frontmatter for docling_json, source_name in [(docling_noocr, "noocr"), (docling_fm, "frontmatter")]: if docling_json: starts = _try_headings(docling_json) if starts: entries = _entries_from_starts(starts, page_count, source="headings") method, confidence = "headings", 0.8 logger.info(f"Split map: headings method ({source_name}) found {len(entries)} sections") break # B2) Headings fallback with limited Docling call (if we have processing bytes) if not entries and processing_bytes and processing_mime: logger.debug(f"Trying headings fallback with limited Docling call for file_id={file_id}") starts = _try_headings_fallback(file_id, cabinet_id, bucket, processing_bytes, processing_mime, page_count) if starts: entries = _entries_from_starts(starts, page_count, source="headings") method, confidence = "headings", 0.75 # Slightly lower confidence for fallback logger.info(f"Split map: headings fallback found {len(entries)} sections") # C) TOC from Tika text if not entries and tika_json: logger.debug(f"Trying TOC extraction from Tika text for file_id={file_id}") # Try common Tika text keys text = tika_json.get("X-TIKA:content") or tika_json.get("content") or "" pairs = _try_toc_text(text) if pairs: entries = _entries_from_pairs(pairs, page_count, source="toc") method, confidence = "toc", 0.75 logger.info(f"Split map: TOC method found {len(entries)} sections") # D) Fixed windows (fallback) if not entries: logger.info(f"Using fixed window fallback for file_id={file_id}") step = max(10, min(20, page_count // 10)) # Adaptive step size pairs = [] for i in range(1, page_count + 1, step): end_page = min(i + step - 1, page_count) title = f"Pages {i}-{end_page}" if i != end_page else f"Page {i}" pairs.append((title, i)) entries = _entries_from_pairs(pairs, page_count, source="fixed") method, confidence = "fixed", 0.2 logger.info(f"Split map: fixed method created {len(entries)} sections") # 5) Normalize entries and build split_map.json entries = _normalize_entries(entries, page_count) split_map = { "version": 1, "file_id": file_id, "source_pdf_artefact_id": a_pdf['id'] if a_pdf else None, "sources": { "docling_noocr_json": a_noocr['id'] if a_noocr else None, "docling_frontmatter_json": a_fm['id'] if a_fm else None, "tika_json": a_tika['id'] if a_tika else None }, "method": method, "confidence": confidence, "page_count": page_count, "entries": entries, "created_at": _now_iso(), "notes": f"auto-generated using {method} method; user can edit in Split Marker UI" } # 6) Store as artefact artefact_id = str(uuid.uuid4()) rel_path = f"{cabinet_id}/{file_id}/{artefact_id}/split_map.json" storage.upload_file( bucket, rel_path, json.dumps(split_map, ensure_ascii=False, indent=2).encode("utf-8"), "application/json", upsert=True ) # Enhanced metadata for UI display enhanced_extra = { "method": method, "confidence": confidence, "entries_count": len(entries), "display_name": "Document Structure Map", "bundle_label": "Split Map", "section_title": "Document Structure Map", "page_count": page_count, "bundle_type": "split_map_json", "processing_mode": "document_analysis", "pipeline": "structure_analysis", "is_structure_map": True, "ui_category": "document_analysis", "ui_order": 2, "description": f"Document section boundaries identified using {method} method with {confidence:.1%} confidence ({len(entries)} sections)", "viewer_type": "json" } client.supabase.table('document_artefacts').insert({ "id": artefact_id, "file_id": file_id, "type": "split_map_json", "rel_path": rel_path, "extra": enhanced_extra, "status": "completed" }).execute() logger.info(f"Split map stored: file_id={file_id}, method={method}, confidence={confidence:.2f}, entries={len(entries)}") return split_map