425 lines
16 KiB
Python
425 lines
16 KiB
Python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
ocr_pdf.py — Convert JPGs of documents into OCRed PDFs.
|
|
Now supports **batch mode** where each subdirectory becomes one PDF.
|
|
|
|
Features
|
|
- Automatic trimming via document contour detection + perspective warp
|
|
- Deskew fallback if no clear document contour is found
|
|
- Image cleanup tuned for 1970s typewritten English pages (contrast/binarization)
|
|
- Tesseract-based OCR to embed a searchable text layer in the PDF
|
|
- Batch multiple images into one output PDF
|
|
- Batch mode: process a root folder; each subdirectory becomes its own PDF
|
|
|
|
USAGE
|
|
------
|
|
# Single-PDF mode (glob patterns allowed)
|
|
uv run ocr-pdf -o out.pdf scans/*.jpg
|
|
|
|
# Batch mode: each subdirectory under ROOT becomes a PDF
|
|
uv run ocr-pdf --batch-root ROOT_DIR --out-dir out_pdfs
|
|
|
|
# Batch mode with filters
|
|
uv run ocr-pdf --batch-root ROOT_DIR --out-dir out_pdfs --patterns "*.jpg,*.png" --recursive
|
|
|
|
DEPENDENCIES
|
|
------------
|
|
- Tesseract must be installed on your system and in PATH.
|
|
macOS (brew): brew install tesseract
|
|
Ubuntu/Debian: sudo apt-get install tesseract-ocr
|
|
Windows: Install from https://github.com/UB-Mannheim/tesseract/wiki
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple, List, Iterable, Sequence, Dict
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import pytesseract
|
|
from pypdf import PdfReader, PdfWriter
|
|
from tempfile import TemporaryDirectory
|
|
import re
|
|
|
|
# -----------------------------
|
|
# Geometry helpers
|
|
# -----------------------------
|
|
|
|
def _order_quad_points(pts: np.ndarray) -> np.ndarray:
|
|
"""Order 4 points as (top-left, top-right, bottom-right, bottom-left)."""
|
|
rect = np.zeros((4, 2), dtype="float32")
|
|
s = pts.sum(axis=1)
|
|
rect[0] = pts[np.argmin(s)]
|
|
rect[2] = pts[np.argmax(s)]
|
|
diff = np.diff(pts, axis=1)
|
|
rect[1] = pts[np.argmin(diff)]
|
|
rect[3] = pts[np.argmax(diff)]
|
|
return rect
|
|
|
|
|
|
def four_point_transform(image: np.ndarray, pts: np.ndarray) -> np.ndarray:
|
|
rect = _order_quad_points(pts.astype("float32"))
|
|
(tl, tr, br, bl) = rect
|
|
|
|
# Compute the width of the new image
|
|
widthA = np.linalg.norm(br - bl)
|
|
widthB = np.linalg.norm(tr - tl)
|
|
maxWidth = int(max(widthA, widthB))
|
|
|
|
# Compute the height of the new image
|
|
heightA = np.linalg.norm(tr - br)
|
|
heightB = np.linalg.norm(tl - bl)
|
|
maxHeight = int(max(heightA, heightB))
|
|
|
|
dst = np.array([
|
|
[0, 0],
|
|
[maxWidth - 1, 0],
|
|
[maxWidth - 1, maxHeight - 1],
|
|
[0, maxHeight - 1]], dtype="float32")
|
|
|
|
M = cv2.getPerspectiveTransform(rect, dst)
|
|
warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
|
|
return warped
|
|
|
|
|
|
# -----------------------------
|
|
# Document detection and cleanup
|
|
# -----------------------------
|
|
|
|
def detect_document_contour(image_bgr: np.ndarray) -> Optional[np.ndarray]:
|
|
"""Find the largest 4-point contour that looks like a document page."""
|
|
image = image_bgr.copy()
|
|
ratio = 1000.0 / max(image.shape[:2]) # scale longest side to ~1000px for speed
|
|
small = cv2.resize(image, (int(image.shape[1]*ratio), int(image.shape[0]*ratio)),
|
|
interpolation=cv2.INTER_AREA)
|
|
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
|
|
gray = cv2.GaussianBlur(gray, (5, 5), 0)
|
|
edges = cv2.Canny(gray, 60, 180)
|
|
|
|
# Close gaps
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
|
|
edges = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel, iterations=1)
|
|
|
|
contours, _ = cv2.findContours(edges, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
|
|
contours = sorted(contours, key=cv2.contourArea, reverse=True)
|
|
|
|
for c in contours[:10]:
|
|
peri = cv2.arcLength(c, True)
|
|
approx = cv2.approxPolyDP(c, 0.02 * peri, True)
|
|
if len(approx) == 4:
|
|
# Scale contour back to original image coords
|
|
approx = (approx.reshape(4, 2) / ratio).astype(np.float32)
|
|
return approx
|
|
return None
|
|
|
|
|
|
def deskew(image_gray: np.ndarray) -> np.ndarray:
|
|
"""Estimate skew angle with Hough transform on text lines; rotate to correct."""
|
|
# Binary for line detection
|
|
g = cv2.GaussianBlur(image_gray, (3, 3), 0)
|
|
_, bw = cv2.threshold(g, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
inv = cv2.bitwise_not(bw)
|
|
|
|
edges = cv2.Canny(inv, 50, 150, apertureSize=3)
|
|
lines = cv2.HoughLines(edges, 1, np.pi / 180.0, 150)
|
|
|
|
angle_deg = 0.0
|
|
if lines is not None:
|
|
angles = []
|
|
for rho_theta in lines[:200]:
|
|
rho, theta = rho_theta[0]
|
|
# Convert to degrees relative to horizontal
|
|
deg = (theta * 180.0 / np.pi) - 90.0
|
|
# Normalize to [-45, 45] to avoid vertical lines
|
|
if deg < -45: deg += 90
|
|
if deg > 45: deg -= 90
|
|
angles.append(deg)
|
|
if angles:
|
|
angle_deg = float(np.median(angles))
|
|
|
|
if abs(angle_deg) < 0.1:
|
|
return image_gray # no significant skew
|
|
|
|
(h, w) = image_gray.shape[:2]
|
|
M = cv2.getRotationMatrix2D((w // 2, h // 2), angle_deg, 1.0)
|
|
rotated = cv2.warpAffine(image_gray, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
|
|
return rotated
|
|
|
|
|
|
def cleanup_for_ocr(image_bgr: np.ndarray) -> np.ndarray:
|
|
"""Return a high-contrast, noise-reduced grayscale image suitable for OCR."""
|
|
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
|
|
|
|
# CLAHE to recover typewriter ink contrast without blowing highlights
|
|
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
|
|
gray = clahe.apply(gray)
|
|
|
|
# Gentle denoise to preserve glyph edges
|
|
gray = cv2.bilateralFilter(gray, d=7, sigmaColor=50, sigmaSpace=50)
|
|
|
|
# Deskew after contrast/denoise
|
|
gray = deskew(gray)
|
|
|
|
# Adaptive threshold tends to work well on aged paper; keep grayscale if needed
|
|
th = cv2.adaptiveThreshold(
|
|
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 35, 15
|
|
)
|
|
|
|
# Remove small speckles
|
|
kernel = np.ones((2, 2), np.uint8)
|
|
th = cv2.morphologyEx(th, cv2.MORPH_OPEN, kernel, iterations=1)
|
|
|
|
return th
|
|
|
|
|
|
def prepare_page(image_path: Path) -> np.ndarray:
|
|
"""Load, auto-trim (if possible), and clean up a single page image. Returns grayscale uint8 image."""
|
|
bgr = cv2.imdecode(np.fromfile(str(image_path), dtype=np.uint8), cv2.IMREAD_COLOR)
|
|
if bgr is None:
|
|
raise RuntimeError(f"Failed to load image: {image_path}")
|
|
|
|
cnt = detect_document_contour(bgr)
|
|
if cnt is not None:
|
|
warped = four_point_transform(bgr, cnt)
|
|
else:
|
|
warped = bgr # fall back to original framing
|
|
|
|
cleaned = cleanup_for_ocr(warped)
|
|
return cleaned
|
|
|
|
|
|
# -----------------------------
|
|
# OCR + PDF assembly
|
|
# -----------------------------
|
|
|
|
def image_to_ocr_pdf_bytes(image: np.ndarray, dpi: int, lang: str, oem: Optional[int], psm: Optional[int]) -> bytes:
|
|
"""Use Tesseract to produce a searchable PDF bytes for one image page."""
|
|
# Tesseract prefers RGB
|
|
if len(image.shape) == 2:
|
|
rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
|
|
else:
|
|
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
|
|
# Hint DPI via config; upsample if very small to approach ~300 dpi text size
|
|
h, w = rgb.shape[:2]
|
|
scale = 1.0
|
|
if max(h, w) < 1500:
|
|
scale = 1500.0 / max(h, w)
|
|
rgb = cv2.resize(rgb, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_CUBIC)
|
|
|
|
config_parts = [f'--dpi {dpi}', f'-l {lang}']
|
|
if oem is not None:
|
|
config_parts.append(f'--oem {int(oem)}')
|
|
if psm is not None:
|
|
config_parts.append(f'--psm {int(psm)}')
|
|
config = " ".join(config_parts)
|
|
|
|
pdf_bytes = pytesseract.image_to_pdf_or_hocr(rgb, extension='pdf', config=config)
|
|
return pdf_bytes
|
|
|
|
|
|
def combine_pdfs(pdf_paths: List[Path], out_path: Path) -> None:
|
|
writer = PdfWriter()
|
|
for p in pdf_paths:
|
|
reader = PdfReader(str(p))
|
|
for page in reader.pages:
|
|
writer.add_page(page)
|
|
with open(out_path, "wb") as f:
|
|
writer.write(f)
|
|
|
|
|
|
# -----------------------------
|
|
# Batch helpers
|
|
# -----------------------------
|
|
|
|
VALID_EXT = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp"}
|
|
|
|
def natural_key(s: str):
|
|
"""Sort key that groups numbers naturally (e.g., page2 < page10)."""
|
|
return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', s)]
|
|
|
|
def find_image_files(dir_path: Path, patterns: Sequence[str]) -> List[Path]:
|
|
files: List[Path] = []
|
|
for pat in patterns:
|
|
files.extend(sorted(dir_path.glob(pat), key=lambda p: natural_key(p.name)))
|
|
# fallback: if no patterns matched, include known image extensions
|
|
if not files:
|
|
for p in sorted(dir_path.iterdir(), key=lambda p: natural_key(p.name)):
|
|
if p.suffix.lower() in VALID_EXT and p.is_file():
|
|
files.append(p)
|
|
return files
|
|
|
|
def iter_target_dirs(root: Path, recursive: bool) -> List[Path]:
|
|
if not recursive:
|
|
# only direct children that are directories (ignore hidden)
|
|
return [p for p in sorted(root.iterdir()) if p.is_dir() and not p.name.startswith(".")]
|
|
# walk recursively; include any directory that contains at least one image
|
|
out = []
|
|
for d, subdirs, files in os.walk(root):
|
|
dpath = Path(d)
|
|
if dpath == root:
|
|
continue
|
|
# skip hidden directories
|
|
if any(part.startswith(".") for part in dpath.parts):
|
|
continue
|
|
for f in files:
|
|
if Path(f).suffix.lower() in VALID_EXT:
|
|
out.append(dpath)
|
|
break
|
|
out = sorted(set(out), key=lambda p: natural_key(str(p.relative_to(root))))
|
|
return out
|
|
|
|
|
|
# -----------------------------
|
|
# Main CLI
|
|
# -----------------------------
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Convert images to OCRed PDF(s) with auto-trim/deskew. Supports batch by subdirectory.")
|
|
mode = ap.add_mutually_exclusive_group(required=False)
|
|
# Single-PDF mode (default when inputs provided)
|
|
ap.add_argument("inputs", nargs="*", help="Input image files (JPG/PNG/etc.). Glob patterns ok (quote them).")
|
|
ap.add_argument("-o", "--output", help="Output PDF path, e.g., out.pdf (required in single mode).")
|
|
|
|
# Batch mode
|
|
ap.add_argument("--batch-root", type=str, help="Root directory containing subdirectories of images. Each subdirectory -> 1 PDF.")
|
|
ap.add_argument("--out-dir", type=str, help="Directory to write PDFs in batch mode.")
|
|
ap.add_argument("--patterns", type=str, default="*.jpg,*.jpeg,*.png,*.tif,*.tiff,*.bmp", help="Comma-separated glob patterns per subdir.")
|
|
ap.add_argument("--recursive", action="store_true", help="Recurse into nested subdirectories in batch mode.")
|
|
|
|
# Common OCR knobs
|
|
ap.add_argument("--dpi", type=int, default=300, help="DPI hint for Tesseract (default: 300)")
|
|
ap.add_argument("--lang", default="eng", help="Tesseract language(s), e.g., 'eng' (default)")
|
|
ap.add_argument("--oem", type=int, choices=[0, 1, 2, 3], default=1, help="Tesseract OCR Engine Mode (default: 1)")
|
|
ap.add_argument("--psm", type=int, choices=list(range(0, 14)), default=6, help="Tesseract Page Segmentation Mode (default: 6)")
|
|
ap.add_argument("--keep-intermediate", action="store_true", help="Keep per-page PDFs in a temp folder for inspection.")
|
|
args = ap.parse_args()
|
|
|
|
# Validate tesseract availability early
|
|
try:
|
|
_ = pytesseract.get_tesseract_version()
|
|
except Exception as e:
|
|
print("ERROR: Tesseract is not available. Please install it and ensure it's in PATH.", file=sys.stderr)
|
|
print(str(e), file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
# ---------------- Single-PDF mode ----------------
|
|
if args.batch_root is None:
|
|
if not args.inputs or not args.output:
|
|
print("In single-PDF mode, provide inputs and --output. For batch mode, use --batch-root and --out-dir.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Expand inputs
|
|
input_paths: List[Path] = []
|
|
for pattern in args.inputs:
|
|
if any(ch in pattern for ch in "*?[]"):
|
|
expanded = [Path(p) for p in sorted(map(str, Path().glob(pattern)))]
|
|
else:
|
|
expanded = [Path(pattern)]
|
|
for p in expanded:
|
|
if p.exists() and p.is_file():
|
|
input_paths.append(p)
|
|
|
|
if not input_paths:
|
|
print("No valid input files found.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
out_path = Path(args.output)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with TemporaryDirectory(prefix="ocr_pdf_") as tmpdir:
|
|
tmpdir = Path(tmpdir)
|
|
page_pdf_paths: List[Path] = []
|
|
|
|
for idx, img_path in enumerate(input_paths, start=1):
|
|
print(f"[{idx}/{len(input_paths)}] Processing {img_path} ...")
|
|
try:
|
|
page_img = prepare_page(img_path)
|
|
pdf_bytes = image_to_ocr_pdf_bytes(
|
|
page_img, dpi=args.dpi, lang=args.lang, oem=args.oem, psm=args.psm
|
|
)
|
|
page_pdf = tmpdir / f"page_{idx:04d}.pdf"
|
|
with open(page_pdf, "wb") as f:
|
|
f.write(pdf_bytes)
|
|
page_pdf_paths.append(page_pdf)
|
|
except Exception as e:
|
|
print(f"ERROR processing {img_path}: {e}", file=sys.stderr)
|
|
|
|
if not page_pdf_paths:
|
|
print("No pages were successfully processed; aborting.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
combine_pdfs(page_pdf_paths, out_path)
|
|
print(f"✅ Wrote OCRed PDF: {out_path}")
|
|
|
|
if args.keep_intermediate:
|
|
keep_dir = out_path.with_suffix("")
|
|
keep_dir = keep_dir.parent / (keep_dir.name + "_pages")
|
|
keep_dir.mkdir(parents=True, exist_ok=True)
|
|
for p in page_pdf_paths:
|
|
dest = keep_dir / p.name
|
|
dest.write_bytes(p.read_bytes())
|
|
print(f"Kept per-page PDFs in: {keep_dir}")
|
|
return
|
|
|
|
# ---------------- Batch mode ----------------
|
|
root = Path(args.batch_root)
|
|
if not root.is_dir():
|
|
print(f"--batch-root is not a directory: {root}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
out_dir = Path(args.out_dir) if args.out_dir else root / "ocr_pdfs"
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
patterns = [p.strip() for p in args.patterns.split(",") if p.strip()]
|
|
targets = iter_target_dirs(root, recursive=args.recursive)
|
|
|
|
if not targets:
|
|
print("No subdirectories with images found under batch root.", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
for d in targets:
|
|
rel = d.relative_to(root)
|
|
images = find_image_files(d, patterns)
|
|
if not images:
|
|
print(f"[skip] {rel} — no images matching {patterns}", file=sys.stderr)
|
|
continue
|
|
|
|
out_pdf = out_dir / (str(rel).replace(os.sep, "__") + ".pdf")
|
|
out_pdf.parent.mkdir(parents=True, exist_ok=True)
|
|
print(f"\n=== {rel} → {out_pdf.name} ({len(images)} pages) ===")
|
|
|
|
with TemporaryDirectory(prefix=f"ocr_{rel}_") as tmpdir:
|
|
tmpdir = Path(tmpdir)
|
|
page_pdf_paths: List[Path] = []
|
|
|
|
for idx, img_path in enumerate(images, start=1):
|
|
print(f"[{rel}] [{idx}/{len(images)}] {img_path.name}")
|
|
try:
|
|
page_img = prepare_page(img_path)
|
|
pdf_bytes = image_to_ocr_pdf_bytes(
|
|
page_img, dpi=args.dpi, lang=args.lang, oem=args.oem, psm=args.psm
|
|
)
|
|
page_pdf = tmpdir / f"page_{idx:04d}.pdf"
|
|
with open(page_pdf, "wb") as f:
|
|
f.write(pdf_bytes)
|
|
page_pdf_paths.append(page_pdf)
|
|
except Exception as e:
|
|
print(f"ERROR processing {img_path}: {e}", file=sys.stderr)
|
|
|
|
if not page_pdf_paths:
|
|
print(f"[skip] {rel} — no pages processed successfully.", file=sys.stderr)
|
|
continue
|
|
|
|
combine_pdfs(page_pdf_paths, out_pdf)
|
|
print(f"✅ Wrote: {out_pdf}")
|
|
|
|
print("\nBatch complete.")
|
|
return
|
|
|
|
if __name__ == "__main__":
|
|
main()
|