feat(docling): detect response regions with OpenCV
This commit is contained in:
parent
9cc986a3f1
commit
0b1496fff5
0
api/__init__.py
Normal file
0
api/__init__.py
Normal file
0
api/services/__init__.py
Normal file
0
api/services/__init__.py
Normal file
0
api/services/docling/__init__.py
Normal file
0
api/services/docling/__init__.py
Normal file
425
api/services/docling/regions.py
Normal file
425
api/services/docling/regions.py
Normal file
@ -0,0 +1,425 @@
|
||||
"""OpenCV response-region detector for exam template auto-map.
|
||||
|
||||
This module is intentionally a best-effort spike. It detects visual writing
|
||||
areas (ruled answer lines and rectangular answer boxes) from rendered exam PDF
|
||||
pages and returns mapper-friendly candidate dictionaries. The caller may ignore
|
||||
this output entirely; manual drawing remains the fallback.
|
||||
|
||||
Candidate schema (``detect_response_regions_from_pdf`` return item)::
|
||||
|
||||
{
|
||||
"kind": "response",
|
||||
"source": "ai",
|
||||
"confirmed": False,
|
||||
"confidence": 0.0..1.0,
|
||||
"page_index": 0, # zero-based PDF page index
|
||||
"bbox": { # rendered-page pixel coordinates
|
||||
"x": 72.0, "y": 210.0,
|
||||
"w": 420.0, "h": 86.0,
|
||||
"coord_origin": "TOPLEFT",
|
||||
"unit": "px",
|
||||
},
|
||||
"region_type": "answer_lines" | "answer_box" | "working_space",
|
||||
"detection_method": "opencv_horizontal_lines" | "opencv_contour_box",
|
||||
"line_count": 3, # answer_lines only
|
||||
"meta": {...},
|
||||
}
|
||||
|
||||
The mapper can persist these as ``exam_response_areas`` with
|
||||
``kind='response'``, ``source='ai'``, ``confirmed=false`` after converting the
|
||||
rendered-page pixel bbox into the app's canvas coordinate system if needed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable
|
||||
|
||||
import fitz # PyMuPDF
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
try: # OpenCV is an optional runtime dependency until S5 wires regions in.
|
||||
import cv2
|
||||
except ImportError as exc: # pragma: no cover - exercised only in underbuilt envs
|
||||
cv2 = None # type: ignore[assignment]
|
||||
_CV2_IMPORT_ERROR = exc
|
||||
else: # pragma: no cover - trivial branch
|
||||
_CV2_IMPORT_ERROR = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RegionCandidate:
|
||||
"""Internal typed candidate before dict serialization."""
|
||||
|
||||
page_index: int
|
||||
x: float
|
||||
y: float
|
||||
w: float
|
||||
h: float
|
||||
region_type: str
|
||||
confidence: float
|
||||
detection_method: str
|
||||
line_count: int | None = None
|
||||
meta: dict[str, Any] | None = None
|
||||
|
||||
def to_mapper_dict(self) -> dict[str, Any]:
|
||||
candidate: dict[str, Any] = {
|
||||
"kind": "response",
|
||||
"source": "ai",
|
||||
"confirmed": False,
|
||||
"confidence": round(float(self.confidence), 3),
|
||||
"page_index": int(self.page_index),
|
||||
"bbox": {
|
||||
"x": round(float(self.x), 2),
|
||||
"y": round(float(self.y), 2),
|
||||
"w": round(float(self.w), 2),
|
||||
"h": round(float(self.h), 2),
|
||||
"coord_origin": "TOPLEFT",
|
||||
"unit": "px",
|
||||
},
|
||||
"region_type": self.region_type,
|
||||
"detection_method": self.detection_method,
|
||||
}
|
||||
if self.line_count is not None:
|
||||
candidate["line_count"] = int(self.line_count)
|
||||
if self.meta:
|
||||
candidate["meta"] = self.meta
|
||||
return candidate
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _LineSegment:
|
||||
x: int
|
||||
y: int
|
||||
w: int
|
||||
h: int
|
||||
|
||||
@property
|
||||
def right(self) -> int:
|
||||
return self.x + self.w
|
||||
|
||||
@property
|
||||
def center_y(self) -> float:
|
||||
return self.y + self.h / 2
|
||||
|
||||
|
||||
def detect_response_regions_from_pdf(
|
||||
pdf_path: str | Path,
|
||||
*,
|
||||
dpi: int = 144,
|
||||
max_pages: int | None = None,
|
||||
page_indices: Iterable[int] | None = None,
|
||||
min_confidence: float = 0.35,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Render a PDF and emit response-area candidate dictionaries.
|
||||
|
||||
Args:
|
||||
pdf_path: Local PDF path.
|
||||
dpi: Render resolution. 144 dpi gives 2 px per PDF point and is a good
|
||||
speed/geometry compromise for the API fast path.
|
||||
max_pages: Optional first-N-pages cap for smoke tests/spikes.
|
||||
page_indices: Optional explicit zero-based page indices. When supplied,
|
||||
``max_pages`` is ignored.
|
||||
min_confidence: Drop candidates below this confidence.
|
||||
|
||||
Returns:
|
||||
List of mapper-friendly dictionaries documented in the module docstring.
|
||||
"""
|
||||
|
||||
if cv2 is None:
|
||||
raise RuntimeError(
|
||||
"OpenCV is required for answer-region detection; install "
|
||||
"opencv-python-headless."
|
||||
) from _CV2_IMPORT_ERROR
|
||||
|
||||
if dpi <= 0:
|
||||
raise ValueError("dpi must be positive")
|
||||
if not 0 <= min_confidence <= 1:
|
||||
raise ValueError("min_confidence must be between 0 and 1")
|
||||
|
||||
path = Path(pdf_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(path)
|
||||
|
||||
doc = fitz.open(path)
|
||||
try:
|
||||
if page_indices is None:
|
||||
pages = range(len(doc) if max_pages is None else min(len(doc), max_pages))
|
||||
else:
|
||||
pages = list(page_indices)
|
||||
candidates: list[dict[str, Any]] = []
|
||||
zoom = dpi / 72.0
|
||||
matrix = fitz.Matrix(zoom, zoom)
|
||||
for page_index in pages:
|
||||
if page_index < 0 or page_index >= len(doc):
|
||||
continue
|
||||
pix = doc[page_index].get_pixmap(matrix=matrix, alpha=False)
|
||||
image = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
|
||||
page_candidates = detect_response_regions_from_image(
|
||||
image,
|
||||
page_index=page_index,
|
||||
min_confidence=min_confidence,
|
||||
)
|
||||
candidates.extend(candidate.to_mapper_dict() for candidate in page_candidates)
|
||||
return candidates
|
||||
finally:
|
||||
doc.close()
|
||||
|
||||
|
||||
def detect_response_regions_from_image(
|
||||
image: Image.Image | np.ndarray,
|
||||
*,
|
||||
page_index: int = 0,
|
||||
min_confidence: float = 0.35,
|
||||
) -> list[RegionCandidate]:
|
||||
"""Detect response-area candidates on one rendered page image."""
|
||||
|
||||
if cv2 is None:
|
||||
raise RuntimeError(
|
||||
"OpenCV is required for answer-region detection; install "
|
||||
"opencv-python-headless."
|
||||
) from _CV2_IMPORT_ERROR
|
||||
if not 0 <= min_confidence <= 1:
|
||||
raise ValueError("min_confidence must be between 0 and 1")
|
||||
|
||||
page = _as_rgb_array(image)
|
||||
gray = cv2.cvtColor(page, cv2.COLOR_RGB2GRAY)
|
||||
binary = _ink_mask(gray)
|
||||
|
||||
height, width = gray.shape[:2]
|
||||
line_candidates = _detect_answer_lines(binary, page_index=page_index, width=width, height=height)
|
||||
box_candidates = _detect_answer_boxes(binary, page_index=page_index, width=width, height=height)
|
||||
candidates = _dedupe_candidates(line_candidates + box_candidates)
|
||||
return [c for c in candidates if c.confidence >= min_confidence]
|
||||
|
||||
|
||||
def _as_rgb_array(image: Image.Image | np.ndarray) -> np.ndarray:
|
||||
if isinstance(image, Image.Image):
|
||||
return np.asarray(image.convert("RGB"))
|
||||
array = np.asarray(image)
|
||||
if array.ndim == 2:
|
||||
return np.stack([array, array, array], axis=-1)
|
||||
if array.shape[-1] == 4:
|
||||
return array[:, :, :3]
|
||||
return array
|
||||
|
||||
|
||||
def _ink_mask(gray: np.ndarray) -> np.ndarray:
|
||||
"""Return a binary mask where printed dark ink is 255."""
|
||||
|
||||
blurred = cv2.GaussianBlur(gray, (3, 3), 0)
|
||||
return cv2.adaptiveThreshold(
|
||||
blurred,
|
||||
255,
|
||||
cv2.ADAPTIVE_THRESH_MEAN_C,
|
||||
cv2.THRESH_BINARY_INV,
|
||||
31,
|
||||
12,
|
||||
)
|
||||
|
||||
|
||||
def _detect_answer_lines(binary: np.ndarray, *, page_index: int, width: int, height: int) -> list[RegionCandidate]:
|
||||
# Long horizontal strokes are answer lines. A wide kernel removes text while
|
||||
# retaining ruled lines; min length scales with the page so it works across
|
||||
# A4/letter and DPI values.
|
||||
min_line_width = max(80, int(width * 0.22))
|
||||
kernel_width = max(30, int(width * 0.08))
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_width, 1))
|
||||
horizontal = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel, iterations=1)
|
||||
contours, _ = cv2.findContours(horizontal, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
segments: list[_LineSegment] = []
|
||||
for contour in contours:
|
||||
x, y, w, h = cv2.boundingRect(contour)
|
||||
if w < min_line_width:
|
||||
continue
|
||||
if h > max(10, int(height * 0.012)):
|
||||
continue
|
||||
# Ignore page borders / header separator lines.
|
||||
if y < height * 0.05 or y > height * 0.96:
|
||||
continue
|
||||
segments.append(_LineSegment(x=x, y=y, w=w, h=max(h, 1)))
|
||||
|
||||
if not segments:
|
||||
return []
|
||||
|
||||
segments.sort(key=lambda seg: (seg.center_y, seg.x))
|
||||
grouped = _group_line_segments(segments, width=width, height=height)
|
||||
|
||||
candidates: list[RegionCandidate] = []
|
||||
for group in grouped:
|
||||
if not group:
|
||||
continue
|
||||
x0 = min(seg.x for seg in group)
|
||||
x1 = max(seg.right for seg in group)
|
||||
y0 = min(seg.y for seg in group)
|
||||
y1 = max(seg.y + seg.h for seg in group)
|
||||
line_count = len(group)
|
||||
|
||||
# Expand vertical bbox so it covers the student-writing band, not just
|
||||
# the 1px strokes. Single underline answers get a modest band above the
|
||||
# line; multi-line answers cover the lines plus inter-line whitespace.
|
||||
if line_count == 1:
|
||||
pad_top = max(18, int(height * 0.018))
|
||||
pad_bottom = max(8, int(height * 0.008))
|
||||
else:
|
||||
gaps = [group[i + 1].center_y - group[i].center_y for i in range(line_count - 1)]
|
||||
median_gap = float(np.median(gaps)) if gaps else height * 0.025
|
||||
pad_top = max(10, int(median_gap * 0.45))
|
||||
pad_bottom = max(8, int(median_gap * 0.35))
|
||||
|
||||
box_x = max(0, x0 - 4)
|
||||
box_y = max(0, y0 - pad_top)
|
||||
box_w = min(width, x1 + 4) - box_x
|
||||
box_h = min(height, y1 + pad_bottom) - box_y
|
||||
if box_w <= 0 or box_h <= 0:
|
||||
continue
|
||||
|
||||
span_ratio = box_w / max(width, 1)
|
||||
count_bonus = min(0.2, max(0, line_count - 1) * 0.05)
|
||||
confidence = min(0.92, 0.42 + span_ratio * 0.35 + count_bonus)
|
||||
region_type = "answer_lines" if line_count > 1 else "working_space"
|
||||
candidates.append(
|
||||
RegionCandidate(
|
||||
page_index=page_index,
|
||||
x=box_x,
|
||||
y=box_y,
|
||||
w=box_w,
|
||||
h=box_h,
|
||||
region_type=region_type,
|
||||
confidence=confidence,
|
||||
detection_method="opencv_horizontal_lines",
|
||||
line_count=line_count,
|
||||
meta={"line_segments": [{"x": s.x, "y": s.y, "w": s.w, "h": s.h} for s in group]},
|
||||
)
|
||||
)
|
||||
return candidates
|
||||
|
||||
|
||||
def _group_line_segments(segments: list[_LineSegment], *, width: int, height: int) -> list[list[_LineSegment]]:
|
||||
groups: list[list[_LineSegment]] = []
|
||||
current: list[_LineSegment] = []
|
||||
max_gap = max(28, int(height * 0.045))
|
||||
min_x_overlap_ratio = 0.35
|
||||
|
||||
for segment in segments:
|
||||
if not current:
|
||||
current = [segment]
|
||||
continue
|
||||
previous = current[-1]
|
||||
y_gap = segment.center_y - previous.center_y
|
||||
overlap = max(0, min(segment.right, previous.right) - max(segment.x, previous.x))
|
||||
narrower = max(1, min(segment.w, previous.w))
|
||||
similar_x = overlap / narrower >= min_x_overlap_ratio or abs(segment.x - previous.x) < width * 0.08
|
||||
if 2 <= y_gap <= max_gap and similar_x:
|
||||
current.append(segment)
|
||||
else:
|
||||
groups.append(current)
|
||||
current = [segment]
|
||||
if current:
|
||||
groups.append(current)
|
||||
return groups
|
||||
|
||||
|
||||
def _detect_answer_boxes(binary: np.ndarray, *, page_index: int, width: int, height: int) -> list[RegionCandidate]:
|
||||
# Close gaps in ruled rectangles, then contour them. This catches table-like
|
||||
# working boxes and explicit answer boxes without trying to understand text.
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
|
||||
closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel, iterations=1)
|
||||
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
candidates: list[RegionCandidate] = []
|
||||
min_area = width * height * 0.003
|
||||
max_area = width * height * 0.55
|
||||
for contour in contours:
|
||||
x, y, w, h = cv2.boundingRect(contour)
|
||||
area = w * h
|
||||
if area < min_area or area > max_area:
|
||||
continue
|
||||
if w < width * 0.16 or h < height * 0.025:
|
||||
continue
|
||||
if y < height * 0.04 or y + h > height * 0.98:
|
||||
continue
|
||||
aspect = w / max(h, 1)
|
||||
if aspect < 1.2:
|
||||
continue
|
||||
|
||||
contour_area = cv2.contourArea(contour)
|
||||
rectangularity = min(1.0, contour_area / max(area, 1))
|
||||
if rectangularity < 0.03:
|
||||
continue
|
||||
confidence = min(0.88, 0.46 + min(0.24, w / width * 0.24) + min(0.18, h / height * 0.5))
|
||||
padded_x = max(0, x - 2)
|
||||
padded_y = max(0, y - 2)
|
||||
padded_right = min(width, x + w + 2)
|
||||
padded_bottom = min(height, y + h + 2)
|
||||
candidates.append(
|
||||
RegionCandidate(
|
||||
page_index=page_index,
|
||||
x=padded_x,
|
||||
y=padded_y,
|
||||
w=padded_right - padded_x,
|
||||
h=padded_bottom - padded_y,
|
||||
region_type="answer_box",
|
||||
confidence=confidence,
|
||||
detection_method="opencv_contour_box",
|
||||
meta={"rectangularity": round(float(rectangularity), 3)},
|
||||
)
|
||||
)
|
||||
return candidates
|
||||
|
||||
|
||||
def _dedupe_candidates(candidates: list[RegionCandidate]) -> list[RegionCandidate]:
|
||||
"""Remove lower-confidence candidates that substantially overlap."""
|
||||
|
||||
kept: list[RegionCandidate] = []
|
||||
for candidate in sorted(candidates, key=lambda c: c.confidence, reverse=True):
|
||||
if all(_iou(candidate, existing) < 0.55 for existing in kept):
|
||||
kept.append(candidate)
|
||||
kept.sort(key=lambda c: (c.page_index, c.y, c.x))
|
||||
return kept
|
||||
|
||||
|
||||
def _iou(a: RegionCandidate, b: RegionCandidate) -> float:
|
||||
if a.page_index != b.page_index:
|
||||
return 0.0
|
||||
ax1, ay1, ax2, ay2 = a.x, a.y, a.x + a.w, a.y + a.h
|
||||
bx1, by1, bx2, by2 = b.x, b.y, b.x + b.w, b.y + b.h
|
||||
ix1, iy1 = max(ax1, bx1), max(ay1, by1)
|
||||
ix2, iy2 = min(ax2, bx2), min(ay2, by2)
|
||||
iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1)
|
||||
intersection = iw * ih
|
||||
union = a.w * a.h + b.w * b.h - intersection
|
||||
return intersection / union if union > 0 else 0.0
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Small CLI for smoke testing: python -m api.services.docling.regions PDF."""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
|
||||
parser = argparse.ArgumentParser(description="Detect answer-region candidates in an exam PDF")
|
||||
parser.add_argument("pdf", help="PDF path")
|
||||
parser.add_argument("--dpi", type=int, default=144)
|
||||
parser.add_argument("--max-pages", type=int, default=None)
|
||||
parser.add_argument("--min-confidence", type=float, default=0.35)
|
||||
args = parser.parse_args()
|
||||
|
||||
print(
|
||||
json.dumps(
|
||||
detect_response_regions_from_pdf(
|
||||
args.pdf,
|
||||
dpi=args.dpi,
|
||||
max_pages=args.max_pages,
|
||||
min_confidence=args.min_confidence,
|
||||
),
|
||||
indent=2,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
0
modules/services/docling/__init__.py
Normal file
0
modules/services/docling/__init__.py
Normal file
13
modules/services/docling/regions.py
Normal file
13
modules/services/docling/regions.py
Normal file
@ -0,0 +1,13 @@
|
||||
"""Compatibility import path for S5 Docling response-region geometry."""
|
||||
|
||||
from api.services.docling.regions import (
|
||||
RegionCandidate,
|
||||
detect_response_regions_from_image,
|
||||
detect_response_regions_from_pdf,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"RegionCandidate",
|
||||
"detect_response_regions_from_image",
|
||||
"detect_response_regions_from_pdf",
|
||||
]
|
||||
@ -80,3 +80,5 @@ Pillow
|
||||
psutil
|
||||
PyPDF2
|
||||
PyMuPDF
|
||||
# OpenCV answer-region geometry (S5-4)
|
||||
opencv-python-headless
|
||||
|
||||
39
tests/test_docling_regions.py
Normal file
39
tests/test_docling_regions.py
Normal file
@ -0,0 +1,39 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from PIL import Image, ImageDraw
|
||||
|
||||
from api.services.docling.regions import detect_response_regions_from_image
|
||||
|
||||
|
||||
def test_detects_grouped_answer_lines() -> None:
|
||||
image = Image.new("RGB", (900, 1200), "white")
|
||||
draw = ImageDraw.Draw(image)
|
||||
for y in (420, 470, 520):
|
||||
draw.line((160, y, 760, y), fill="black", width=3)
|
||||
|
||||
candidates = detect_response_regions_from_image(image, page_index=2)
|
||||
|
||||
line_regions = [c.to_mapper_dict() for c in candidates if c.region_type == "answer_lines"]
|
||||
assert line_regions
|
||||
best = line_regions[0]
|
||||
assert best["kind"] == "response"
|
||||
assert best["source"] == "ai"
|
||||
assert best["confirmed"] is False
|
||||
assert best["page_index"] == 2
|
||||
assert best["line_count"] == 3
|
||||
assert best["bbox"]["coord_origin"] == "TOPLEFT"
|
||||
assert best["bbox"]["w"] > 550
|
||||
assert best["bbox"]["h"] > 80
|
||||
|
||||
|
||||
def test_detects_answer_box() -> None:
|
||||
image = Image.new("RGB", (900, 1200), "white")
|
||||
draw = ImageDraw.Draw(image)
|
||||
draw.rectangle((140, 300, 780, 520), outline="black", width=3)
|
||||
|
||||
candidates = detect_response_regions_from_image(image, page_index=0)
|
||||
|
||||
boxes = [c.to_mapper_dict() for c in candidates if c.region_type == "answer_box"]
|
||||
assert boxes
|
||||
assert boxes[0]["bbox"]["w"] > 600
|
||||
assert boxes[0]["bbox"]["h"] > 200
|
||||
Loading…
x
Reference in New Issue
Block a user