Files
jpegdoc2pdf/ocr_pdf.py

425 lines
16 KiB
Python

#!/usr/bin/env python3
"""
ocr_pdf.py — Convert JPGs of documents into OCRed PDFs.
Now supports **batch mode** where each subdirectory becomes one PDF.
Features
- Automatic trimming via document contour detection + perspective warp
- Deskew fallback if no clear document contour is found
- Image cleanup tuned for 1970s typewritten English pages (contrast/binarization)
- Tesseract-based OCR to embed a searchable text layer in the PDF
- Batch multiple images into one output PDF
- Batch mode: process a root folder; each subdirectory becomes its own PDF
USAGE
------
# Single-PDF mode (glob patterns allowed)
uv run ocr-pdf -o out.pdf scans/*.jpg
# Batch mode: each subdirectory under ROOT becomes a PDF
uv run ocr-pdf --batch-root ROOT_DIR --out-dir out_pdfs
# Batch mode with filters
uv run ocr-pdf --batch-root ROOT_DIR --out-dir out_pdfs --patterns "*.jpg,*.png" --recursive
DEPENDENCIES
------------
- Tesseract must be installed on your system and in PATH.
macOS (brew): brew install tesseract
Ubuntu/Debian: sudo apt-get install tesseract-ocr
Windows: Install from https://github.com/UB-Mannheim/tesseract/wiki
"""
import argparse
import os
import sys
from pathlib import Path
from typing import Optional, Tuple, List, Iterable, Sequence, Dict
import cv2
import numpy as np
import pytesseract
from pypdf import PdfReader, PdfWriter
from tempfile import TemporaryDirectory
import re
# -----------------------------
# Geometry helpers
# -----------------------------
def _order_quad_points(pts: np.ndarray) -> np.ndarray:
"""Order 4 points as (top-left, top-right, bottom-right, bottom-left)."""
rect = np.zeros((4, 2), dtype="float32")
s = pts.sum(axis=1)
rect[0] = pts[np.argmin(s)]
rect[2] = pts[np.argmax(s)]
diff = np.diff(pts, axis=1)
rect[1] = pts[np.argmin(diff)]
rect[3] = pts[np.argmax(diff)]
return rect
def four_point_transform(image: np.ndarray, pts: np.ndarray) -> np.ndarray:
rect = _order_quad_points(pts.astype("float32"))
(tl, tr, br, bl) = rect
# Compute the width of the new image
widthA = np.linalg.norm(br - bl)
widthB = np.linalg.norm(tr - tl)
maxWidth = int(max(widthA, widthB))
# Compute the height of the new image
heightA = np.linalg.norm(tr - br)
heightB = np.linalg.norm(tl - bl)
maxHeight = int(max(heightA, heightB))
dst = np.array([
[0, 0],
[maxWidth - 1, 0],
[maxWidth - 1, maxHeight - 1],
[0, maxHeight - 1]], dtype="float32")
M = cv2.getPerspectiveTransform(rect, dst)
warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
return warped
# -----------------------------
# Document detection and cleanup
# -----------------------------
def detect_document_contour(image_bgr: np.ndarray) -> Optional[np.ndarray]:
"""Find the largest 4-point contour that looks like a document page."""
image = image_bgr.copy()
ratio = 1000.0 / max(image.shape[:2]) # scale longest side to ~1000px for speed
small = cv2.resize(image, (int(image.shape[1]*ratio), int(image.shape[0]*ratio)),
interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (5, 5), 0)
edges = cv2.Canny(gray, 60, 180)
# Close gaps
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
edges = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel, iterations=1)
contours, _ = cv2.findContours(edges, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
contours = sorted(contours, key=cv2.contourArea, reverse=True)
for c in contours[:10]:
peri = cv2.arcLength(c, True)
approx = cv2.approxPolyDP(c, 0.02 * peri, True)
if len(approx) == 4:
# Scale contour back to original image coords
approx = (approx.reshape(4, 2) / ratio).astype(np.float32)
return approx
return None
def deskew(image_gray: np.ndarray) -> np.ndarray:
"""Estimate skew angle with Hough transform on text lines; rotate to correct."""
# Binary for line detection
g = cv2.GaussianBlur(image_gray, (3, 3), 0)
_, bw = cv2.threshold(g, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
inv = cv2.bitwise_not(bw)
edges = cv2.Canny(inv, 50, 150, apertureSize=3)
lines = cv2.HoughLines(edges, 1, np.pi / 180.0, 150)
angle_deg = 0.0
if lines is not None:
angles = []
for rho_theta in lines[:200]:
rho, theta = rho_theta[0]
# Convert to degrees relative to horizontal
deg = (theta * 180.0 / np.pi) - 90.0
# Normalize to [-45, 45] to avoid vertical lines
if deg < -45: deg += 90
if deg > 45: deg -= 90
angles.append(deg)
if angles:
angle_deg = float(np.median(angles))
if abs(angle_deg) < 0.1:
return image_gray # no significant skew
(h, w) = image_gray.shape[:2]
M = cv2.getRotationMatrix2D((w // 2, h // 2), angle_deg, 1.0)
rotated = cv2.warpAffine(image_gray, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
return rotated
def cleanup_for_ocr(image_bgr: np.ndarray) -> np.ndarray:
"""Return a high-contrast, noise-reduced grayscale image suitable for OCR."""
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
# CLAHE to recover typewriter ink contrast without blowing highlights
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
gray = clahe.apply(gray)
# Gentle denoise to preserve glyph edges
gray = cv2.bilateralFilter(gray, d=7, sigmaColor=50, sigmaSpace=50)
# Deskew after contrast/denoise
gray = deskew(gray)
# Adaptive threshold tends to work well on aged paper; keep grayscale if needed
th = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 35, 15
)
# Remove small speckles
kernel = np.ones((2, 2), np.uint8)
th = cv2.morphologyEx(th, cv2.MORPH_OPEN, kernel, iterations=1)
return th
def prepare_page(image_path: Path) -> np.ndarray:
"""Load, auto-trim (if possible), and clean up a single page image. Returns grayscale uint8 image."""
bgr = cv2.imdecode(np.fromfile(str(image_path), dtype=np.uint8), cv2.IMREAD_COLOR)
if bgr is None:
raise RuntimeError(f"Failed to load image: {image_path}")
cnt = detect_document_contour(bgr)
if cnt is not None:
warped = four_point_transform(bgr, cnt)
else:
warped = bgr # fall back to original framing
cleaned = cleanup_for_ocr(warped)
return cleaned
# -----------------------------
# OCR + PDF assembly
# -----------------------------
def image_to_ocr_pdf_bytes(image: np.ndarray, dpi: int, lang: str, oem: Optional[int], psm: Optional[int]) -> bytes:
"""Use Tesseract to produce a searchable PDF bytes for one image page."""
# Tesseract prefers RGB
if len(image.shape) == 2:
rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
else:
rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Hint DPI via config; upsample if very small to approach ~300 dpi text size
h, w = rgb.shape[:2]
scale = 1.0
if max(h, w) < 1500:
scale = 1500.0 / max(h, w)
rgb = cv2.resize(rgb, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_CUBIC)
config_parts = [f'--dpi {dpi}', f'-l {lang}']
if oem is not None:
config_parts.append(f'--oem {int(oem)}')
if psm is not None:
config_parts.append(f'--psm {int(psm)}')
config = " ".join(config_parts)
pdf_bytes = pytesseract.image_to_pdf_or_hocr(rgb, extension='pdf', config=config)
return pdf_bytes
def combine_pdfs(pdf_paths: List[Path], out_path: Path) -> None:
writer = PdfWriter()
for p in pdf_paths:
reader = PdfReader(str(p))
for page in reader.pages:
writer.add_page(page)
with open(out_path, "wb") as f:
writer.write(f)
# -----------------------------
# Batch helpers
# -----------------------------
VALID_EXT = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp"}
def natural_key(s: str):
"""Sort key that groups numbers naturally (e.g., page2 < page10)."""
return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', s)]
def find_image_files(dir_path: Path, patterns: Sequence[str]) -> List[Path]:
files: List[Path] = []
for pat in patterns:
files.extend(sorted(dir_path.glob(pat), key=lambda p: natural_key(p.name)))
# fallback: if no patterns matched, include known image extensions
if not files:
for p in sorted(dir_path.iterdir(), key=lambda p: natural_key(p.name)):
if p.suffix.lower() in VALID_EXT and p.is_file():
files.append(p)
return files
def iter_target_dirs(root: Path, recursive: bool) -> List[Path]:
if not recursive:
# only direct children that are directories (ignore hidden)
return [p for p in sorted(root.iterdir()) if p.is_dir() and not p.name.startswith(".")]
# walk recursively; include any directory that contains at least one image
out = []
for d, subdirs, files in os.walk(root):
dpath = Path(d)
if dpath == root:
continue
# skip hidden directories
if any(part.startswith(".") for part in dpath.parts):
continue
for f in files:
if Path(f).suffix.lower() in VALID_EXT:
out.append(dpath)
break
out = sorted(set(out), key=lambda p: natural_key(str(p.relative_to(root))))
return out
# -----------------------------
# Main CLI
# -----------------------------
def main():
ap = argparse.ArgumentParser(description="Convert images to OCRed PDF(s) with auto-trim/deskew. Supports batch by subdirectory.")
mode = ap.add_mutually_exclusive_group(required=False)
# Single-PDF mode (default when inputs provided)
ap.add_argument("inputs", nargs="*", help="Input image files (JPG/PNG/etc.). Glob patterns ok (quote them).")
ap.add_argument("-o", "--output", help="Output PDF path, e.g., out.pdf (required in single mode).")
# Batch mode
ap.add_argument("--batch-root", type=str, help="Root directory containing subdirectories of images. Each subdirectory -> 1 PDF.")
ap.add_argument("--out-dir", type=str, help="Directory to write PDFs in batch mode.")
ap.add_argument("--patterns", type=str, default="*.jpg,*.jpeg,*.png,*.tif,*.tiff,*.bmp", help="Comma-separated glob patterns per subdir.")
ap.add_argument("--recursive", action="store_true", help="Recurse into nested subdirectories in batch mode.")
# Common OCR knobs
ap.add_argument("--dpi", type=int, default=300, help="DPI hint for Tesseract (default: 300)")
ap.add_argument("--lang", default="eng", help="Tesseract language(s), e.g., 'eng' (default)")
ap.add_argument("--oem", type=int, choices=[0, 1, 2, 3], default=1, help="Tesseract OCR Engine Mode (default: 1)")
ap.add_argument("--psm", type=int, choices=list(range(0, 14)), default=6, help="Tesseract Page Segmentation Mode (default: 6)")
ap.add_argument("--keep-intermediate", action="store_true", help="Keep per-page PDFs in a temp folder for inspection.")
args = ap.parse_args()
# Validate tesseract availability early
try:
_ = pytesseract.get_tesseract_version()
except Exception as e:
print("ERROR: Tesseract is not available. Please install it and ensure it's in PATH.", file=sys.stderr)
print(str(e), file=sys.stderr)
sys.exit(2)
# ---------------- Single-PDF mode ----------------
if args.batch_root is None:
if not args.inputs or not args.output:
print("In single-PDF mode, provide inputs and --output. For batch mode, use --batch-root and --out-dir.", file=sys.stderr)
sys.exit(1)
# Expand inputs
input_paths: List[Path] = []
for pattern in args.inputs:
if any(ch in pattern for ch in "*?[]"):
expanded = [Path(p) for p in sorted(map(str, Path().glob(pattern)))]
else:
expanded = [Path(pattern)]
for p in expanded:
if p.exists() and p.is_file():
input_paths.append(p)
if not input_paths:
print("No valid input files found.", file=sys.stderr)
sys.exit(1)
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
with TemporaryDirectory(prefix="ocr_pdf_") as tmpdir:
tmpdir = Path(tmpdir)
page_pdf_paths: List[Path] = []
for idx, img_path in enumerate(input_paths, start=1):
print(f"[{idx}/{len(input_paths)}] Processing {img_path} ...")
try:
page_img = prepare_page(img_path)
pdf_bytes = image_to_ocr_pdf_bytes(
page_img, dpi=args.dpi, lang=args.lang, oem=args.oem, psm=args.psm
)
page_pdf = tmpdir / f"page_{idx:04d}.pdf"
with open(page_pdf, "wb") as f:
f.write(pdf_bytes)
page_pdf_paths.append(page_pdf)
except Exception as e:
print(f"ERROR processing {img_path}: {e}", file=sys.stderr)
if not page_pdf_paths:
print("No pages were successfully processed; aborting.", file=sys.stderr)
sys.exit(1)
combine_pdfs(page_pdf_paths, out_path)
print(f"✅ Wrote OCRed PDF: {out_path}")
if args.keep_intermediate:
keep_dir = out_path.with_suffix("")
keep_dir = keep_dir.parent / (keep_dir.name + "_pages")
keep_dir.mkdir(parents=True, exist_ok=True)
for p in page_pdf_paths:
dest = keep_dir / p.name
dest.write_bytes(p.read_bytes())
print(f"Kept per-page PDFs in: {keep_dir}")
return
# ---------------- Batch mode ----------------
root = Path(args.batch_root)
if not root.is_dir():
print(f"--batch-root is not a directory: {root}", file=sys.stderr)
sys.exit(1)
out_dir = Path(args.out_dir) if args.out_dir else root / "ocr_pdfs"
out_dir.mkdir(parents=True, exist_ok=True)
patterns = [p.strip() for p in args.patterns.split(",") if p.strip()]
targets = iter_target_dirs(root, recursive=args.recursive)
if not targets:
print("No subdirectories with images found under batch root.", file=sys.stderr)
sys.exit(1)
for d in targets:
rel = d.relative_to(root)
images = find_image_files(d, patterns)
if not images:
print(f"[skip] {rel} — no images matching {patterns}", file=sys.stderr)
continue
out_pdf = out_dir / (str(rel).replace(os.sep, "__") + ".pdf")
out_pdf.parent.mkdir(parents=True, exist_ok=True)
print(f"\n=== {rel}{out_pdf.name} ({len(images)} pages) ===")
with TemporaryDirectory(prefix=f"ocr_{rel}_") as tmpdir:
tmpdir = Path(tmpdir)
page_pdf_paths: List[Path] = []
for idx, img_path in enumerate(images, start=1):
print(f"[{rel}] [{idx}/{len(images)}] {img_path.name}")
try:
page_img = prepare_page(img_path)
pdf_bytes = image_to_ocr_pdf_bytes(
page_img, dpi=args.dpi, lang=args.lang, oem=args.oem, psm=args.psm
)
page_pdf = tmpdir / f"page_{idx:04d}.pdf"
with open(page_pdf, "wb") as f:
f.write(pdf_bytes)
page_pdf_paths.append(page_pdf)
except Exception as e:
print(f"ERROR processing {img_path}: {e}", file=sys.stderr)
if not page_pdf_paths:
print(f"[skip] {rel} — no pages processed successfully.", file=sys.stderr)
continue
combine_pdfs(page_pdf_paths, out_pdf)
print(f"✅ Wrote: {out_pdf}")
print("\nBatch complete.")
return
if __name__ == "__main__":
main()