diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/services/__init__.py b/api/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/services/docling/__init__.py b/api/services/docling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/services/docling/regions.py b/api/services/docling/regions.py new file mode 100644 index 0000000..090b0a5 --- /dev/null +++ b/api/services/docling/regions.py @@ -0,0 +1,425 @@ +"""OpenCV response-region detector for exam template auto-map. + +This module is intentionally a best-effort spike. It detects visual writing +areas (ruled answer lines and rectangular answer boxes) from rendered exam PDF +pages and returns mapper-friendly candidate dictionaries. The caller may ignore +this output entirely; manual drawing remains the fallback. + +Candidate schema (``detect_response_regions_from_pdf`` return item):: + + { + "kind": "response", + "source": "ai", + "confirmed": False, + "confidence": 0.0..1.0, + "page_index": 0, # zero-based PDF page index + "bbox": { # rendered-page pixel coordinates + "x": 72.0, "y": 210.0, + "w": 420.0, "h": 86.0, + "coord_origin": "TOPLEFT", + "unit": "px", + }, + "region_type": "answer_lines" | "answer_box" | "working_space", + "detection_method": "opencv_horizontal_lines" | "opencv_contour_box", + "line_count": 3, # answer_lines only + "meta": {...}, + } + +The mapper can persist these as ``exam_response_areas`` with +``kind='response'``, ``source='ai'``, ``confirmed=false`` after converting the +rendered-page pixel bbox into the app's canvas coordinate system if needed. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Iterable + +import fitz # PyMuPDF +import numpy as np +from PIL import Image + +try: # OpenCV is an optional runtime dependency until S5 wires regions in. + import cv2 +except ImportError as exc: # pragma: no cover - exercised only in underbuilt envs + cv2 = None # type: ignore[assignment] + _CV2_IMPORT_ERROR = exc +else: # pragma: no cover - trivial branch + _CV2_IMPORT_ERROR = None + + +@dataclass(frozen=True) +class RegionCandidate: + """Internal typed candidate before dict serialization.""" + + page_index: int + x: float + y: float + w: float + h: float + region_type: str + confidence: float + detection_method: str + line_count: int | None = None + meta: dict[str, Any] | None = None + + def to_mapper_dict(self) -> dict[str, Any]: + candidate: dict[str, Any] = { + "kind": "response", + "source": "ai", + "confirmed": False, + "confidence": round(float(self.confidence), 3), + "page_index": int(self.page_index), + "bbox": { + "x": round(float(self.x), 2), + "y": round(float(self.y), 2), + "w": round(float(self.w), 2), + "h": round(float(self.h), 2), + "coord_origin": "TOPLEFT", + "unit": "px", + }, + "region_type": self.region_type, + "detection_method": self.detection_method, + } + if self.line_count is not None: + candidate["line_count"] = int(self.line_count) + if self.meta: + candidate["meta"] = self.meta + return candidate + + +@dataclass(frozen=True) +class _LineSegment: + x: int + y: int + w: int + h: int + + @property + def right(self) -> int: + return self.x + self.w + + @property + def center_y(self) -> float: + return self.y + self.h / 2 + + +def detect_response_regions_from_pdf( + pdf_path: str | Path, + *, + dpi: int = 144, + max_pages: int | None = None, + page_indices: Iterable[int] | None = None, + min_confidence: float = 0.35, +) -> list[dict[str, Any]]: + """Render a PDF and emit response-area candidate dictionaries. + + Args: + pdf_path: Local PDF path. + dpi: Render resolution. 144 dpi gives 2 px per PDF point and is a good + speed/geometry compromise for the API fast path. + max_pages: Optional first-N-pages cap for smoke tests/spikes. + page_indices: Optional explicit zero-based page indices. When supplied, + ``max_pages`` is ignored. + min_confidence: Drop candidates below this confidence. + + Returns: + List of mapper-friendly dictionaries documented in the module docstring. + """ + + if cv2 is None: + raise RuntimeError( + "OpenCV is required for answer-region detection; install " + "opencv-python-headless." + ) from _CV2_IMPORT_ERROR + + if dpi <= 0: + raise ValueError("dpi must be positive") + if not 0 <= min_confidence <= 1: + raise ValueError("min_confidence must be between 0 and 1") + + path = Path(pdf_path) + if not path.exists(): + raise FileNotFoundError(path) + + doc = fitz.open(path) + try: + if page_indices is None: + pages = range(len(doc) if max_pages is None else min(len(doc), max_pages)) + else: + pages = list(page_indices) + candidates: list[dict[str, Any]] = [] + zoom = dpi / 72.0 + matrix = fitz.Matrix(zoom, zoom) + for page_index in pages: + if page_index < 0 or page_index >= len(doc): + continue + pix = doc[page_index].get_pixmap(matrix=matrix, alpha=False) + image = Image.frombytes("RGB", (pix.width, pix.height), pix.samples) + page_candidates = detect_response_regions_from_image( + image, + page_index=page_index, + min_confidence=min_confidence, + ) + candidates.extend(candidate.to_mapper_dict() for candidate in page_candidates) + return candidates + finally: + doc.close() + + +def detect_response_regions_from_image( + image: Image.Image | np.ndarray, + *, + page_index: int = 0, + min_confidence: float = 0.35, +) -> list[RegionCandidate]: + """Detect response-area candidates on one rendered page image.""" + + if cv2 is None: + raise RuntimeError( + "OpenCV is required for answer-region detection; install " + "opencv-python-headless." + ) from _CV2_IMPORT_ERROR + if not 0 <= min_confidence <= 1: + raise ValueError("min_confidence must be between 0 and 1") + + page = _as_rgb_array(image) + gray = cv2.cvtColor(page, cv2.COLOR_RGB2GRAY) + binary = _ink_mask(gray) + + height, width = gray.shape[:2] + line_candidates = _detect_answer_lines(binary, page_index=page_index, width=width, height=height) + box_candidates = _detect_answer_boxes(binary, page_index=page_index, width=width, height=height) + candidates = _dedupe_candidates(line_candidates + box_candidates) + return [c for c in candidates if c.confidence >= min_confidence] + + +def _as_rgb_array(image: Image.Image | np.ndarray) -> np.ndarray: + if isinstance(image, Image.Image): + return np.asarray(image.convert("RGB")) + array = np.asarray(image) + if array.ndim == 2: + return np.stack([array, array, array], axis=-1) + if array.shape[-1] == 4: + return array[:, :, :3] + return array + + +def _ink_mask(gray: np.ndarray) -> np.ndarray: + """Return a binary mask where printed dark ink is 255.""" + + blurred = cv2.GaussianBlur(gray, (3, 3), 0) + return cv2.adaptiveThreshold( + blurred, + 255, + cv2.ADAPTIVE_THRESH_MEAN_C, + cv2.THRESH_BINARY_INV, + 31, + 12, + ) + + +def _detect_answer_lines(binary: np.ndarray, *, page_index: int, width: int, height: int) -> list[RegionCandidate]: + # Long horizontal strokes are answer lines. A wide kernel removes text while + # retaining ruled lines; min length scales with the page so it works across + # A4/letter and DPI values. + min_line_width = max(80, int(width * 0.22)) + kernel_width = max(30, int(width * 0.08)) + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_width, 1)) + horizontal = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel, iterations=1) + contours, _ = cv2.findContours(horizontal, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + segments: list[_LineSegment] = [] + for contour in contours: + x, y, w, h = cv2.boundingRect(contour) + if w < min_line_width: + continue + if h > max(10, int(height * 0.012)): + continue + # Ignore page borders / header separator lines. + if y < height * 0.05 or y > height * 0.96: + continue + segments.append(_LineSegment(x=x, y=y, w=w, h=max(h, 1))) + + if not segments: + return [] + + segments.sort(key=lambda seg: (seg.center_y, seg.x)) + grouped = _group_line_segments(segments, width=width, height=height) + + candidates: list[RegionCandidate] = [] + for group in grouped: + if not group: + continue + x0 = min(seg.x for seg in group) + x1 = max(seg.right for seg in group) + y0 = min(seg.y for seg in group) + y1 = max(seg.y + seg.h for seg in group) + line_count = len(group) + + # Expand vertical bbox so it covers the student-writing band, not just + # the 1px strokes. Single underline answers get a modest band above the + # line; multi-line answers cover the lines plus inter-line whitespace. + if line_count == 1: + pad_top = max(18, int(height * 0.018)) + pad_bottom = max(8, int(height * 0.008)) + else: + gaps = [group[i + 1].center_y - group[i].center_y for i in range(line_count - 1)] + median_gap = float(np.median(gaps)) if gaps else height * 0.025 + pad_top = max(10, int(median_gap * 0.45)) + pad_bottom = max(8, int(median_gap * 0.35)) + + box_x = max(0, x0 - 4) + box_y = max(0, y0 - pad_top) + box_w = min(width, x1 + 4) - box_x + box_h = min(height, y1 + pad_bottom) - box_y + if box_w <= 0 or box_h <= 0: + continue + + span_ratio = box_w / max(width, 1) + count_bonus = min(0.2, max(0, line_count - 1) * 0.05) + confidence = min(0.92, 0.42 + span_ratio * 0.35 + count_bonus) + region_type = "answer_lines" if line_count > 1 else "working_space" + candidates.append( + RegionCandidate( + page_index=page_index, + x=box_x, + y=box_y, + w=box_w, + h=box_h, + region_type=region_type, + confidence=confidence, + detection_method="opencv_horizontal_lines", + line_count=line_count, + meta={"line_segments": [{"x": s.x, "y": s.y, "w": s.w, "h": s.h} for s in group]}, + ) + ) + return candidates + + +def _group_line_segments(segments: list[_LineSegment], *, width: int, height: int) -> list[list[_LineSegment]]: + groups: list[list[_LineSegment]] = [] + current: list[_LineSegment] = [] + max_gap = max(28, int(height * 0.045)) + min_x_overlap_ratio = 0.35 + + for segment in segments: + if not current: + current = [segment] + continue + previous = current[-1] + y_gap = segment.center_y - previous.center_y + overlap = max(0, min(segment.right, previous.right) - max(segment.x, previous.x)) + narrower = max(1, min(segment.w, previous.w)) + similar_x = overlap / narrower >= min_x_overlap_ratio or abs(segment.x - previous.x) < width * 0.08 + if 2 <= y_gap <= max_gap and similar_x: + current.append(segment) + else: + groups.append(current) + current = [segment] + if current: + groups.append(current) + return groups + + +def _detect_answer_boxes(binary: np.ndarray, *, page_index: int, width: int, height: int) -> list[RegionCandidate]: + # Close gaps in ruled rectangles, then contour them. This catches table-like + # working boxes and explicit answer boxes without trying to understand text. + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5)) + closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel, iterations=1) + contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + candidates: list[RegionCandidate] = [] + min_area = width * height * 0.003 + max_area = width * height * 0.55 + for contour in contours: + x, y, w, h = cv2.boundingRect(contour) + area = w * h + if area < min_area or area > max_area: + continue + if w < width * 0.16 or h < height * 0.025: + continue + if y < height * 0.04 or y + h > height * 0.98: + continue + aspect = w / max(h, 1) + if aspect < 1.2: + continue + + contour_area = cv2.contourArea(contour) + rectangularity = min(1.0, contour_area / max(area, 1)) + if rectangularity < 0.03: + continue + confidence = min(0.88, 0.46 + min(0.24, w / width * 0.24) + min(0.18, h / height * 0.5)) + padded_x = max(0, x - 2) + padded_y = max(0, y - 2) + padded_right = min(width, x + w + 2) + padded_bottom = min(height, y + h + 2) + candidates.append( + RegionCandidate( + page_index=page_index, + x=padded_x, + y=padded_y, + w=padded_right - padded_x, + h=padded_bottom - padded_y, + region_type="answer_box", + confidence=confidence, + detection_method="opencv_contour_box", + meta={"rectangularity": round(float(rectangularity), 3)}, + ) + ) + return candidates + + +def _dedupe_candidates(candidates: list[RegionCandidate]) -> list[RegionCandidate]: + """Remove lower-confidence candidates that substantially overlap.""" + + kept: list[RegionCandidate] = [] + for candidate in sorted(candidates, key=lambda c: c.confidence, reverse=True): + if all(_iou(candidate, existing) < 0.55 for existing in kept): + kept.append(candidate) + kept.sort(key=lambda c: (c.page_index, c.y, c.x)) + return kept + + +def _iou(a: RegionCandidate, b: RegionCandidate) -> float: + if a.page_index != b.page_index: + return 0.0 + ax1, ay1, ax2, ay2 = a.x, a.y, a.x + a.w, a.y + a.h + bx1, by1, bx2, by2 = b.x, b.y, b.x + b.w, b.y + b.h + ix1, iy1 = max(ax1, bx1), max(ay1, by1) + ix2, iy2 = min(ax2, bx2), min(ay2, by2) + iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1) + intersection = iw * ih + union = a.w * a.h + b.w * b.h - intersection + return intersection / union if union > 0 else 0.0 + + +def main() -> None: + """Small CLI for smoke testing: python -m api.services.docling.regions PDF.""" + + import argparse + import json + + parser = argparse.ArgumentParser(description="Detect answer-region candidates in an exam PDF") + parser.add_argument("pdf", help="PDF path") + parser.add_argument("--dpi", type=int, default=144) + parser.add_argument("--max-pages", type=int, default=None) + parser.add_argument("--min-confidence", type=float, default=0.35) + args = parser.parse_args() + + print( + json.dumps( + detect_response_regions_from_pdf( + args.pdf, + dpi=args.dpi, + max_pages=args.max_pages, + min_confidence=args.min_confidence, + ), + indent=2, + ) + ) + + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/modules/services/docling/__init__.py b/modules/services/docling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/services/docling/regions.py b/modules/services/docling/regions.py new file mode 100644 index 0000000..9574db9 --- /dev/null +++ b/modules/services/docling/regions.py @@ -0,0 +1,13 @@ +"""Compatibility import path for S5 Docling response-region geometry.""" + +from api.services.docling.regions import ( + RegionCandidate, + detect_response_regions_from_image, + detect_response_regions_from_pdf, +) + +__all__ = [ + "RegionCandidate", + "detect_response_regions_from_image", + "detect_response_regions_from_pdf", +] diff --git a/requirements.txt b/requirements.txt index 7d1aced..2ac196d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -79,4 +79,6 @@ pdfminer.six Pillow psutil PyPDF2 -PyMuPDF \ No newline at end of file +PyMuPDF +# OpenCV answer-region geometry (S5-4) +opencv-python-headless diff --git a/tests/test_docling_regions.py b/tests/test_docling_regions.py new file mode 100644 index 0000000..69492c1 --- /dev/null +++ b/tests/test_docling_regions.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from PIL import Image, ImageDraw + +from api.services.docling.regions import detect_response_regions_from_image + + +def test_detects_grouped_answer_lines() -> None: + image = Image.new("RGB", (900, 1200), "white") + draw = ImageDraw.Draw(image) + for y in (420, 470, 520): + draw.line((160, y, 760, y), fill="black", width=3) + + candidates = detect_response_regions_from_image(image, page_index=2) + + line_regions = [c.to_mapper_dict() for c in candidates if c.region_type == "answer_lines"] + assert line_regions + best = line_regions[0] + assert best["kind"] == "response" + assert best["source"] == "ai" + assert best["confirmed"] is False + assert best["page_index"] == 2 + assert best["line_count"] == 3 + assert best["bbox"]["coord_origin"] == "TOPLEFT" + assert best["bbox"]["w"] > 550 + assert best["bbox"]["h"] > 80 + + +def test_detects_answer_box() -> None: + image = Image.new("RGB", (900, 1200), "white") + draw = ImageDraw.Draw(image) + draw.rectangle((140, 300, 780, 520), outline="black", width=3) + + candidates = detect_response_regions_from_image(image, page_index=0) + + boxes = [c.to_mapper_dict() for c in candidates if c.region_type == "answer_box"] + assert boxes + assert boxes[0]["bbox"]["w"] > 600 + assert boxes[0]["bbox"]["h"] > 200