204 lines
6.2 KiB
Python
204 lines
6.2 KiB
Python
"""
|
|
Persistent Python OCR daemon (stdin/stdout JSON-per-line protocol).
|
|
|
|
Supports EasyOCR engine, lazy-loaded on first use.
|
|
Managed as a subprocess by PythonOcrBridge in Poe2Trade.Screen.
|
|
|
|
Request: {"cmd": "ocr", "engine": "easyocr", "imagePath": "C:\\temp\\screenshot.png"}
|
|
Response: {"ok": true, "text": "...", "lines": [{"text": "...", "words": [...]}]}
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
|
|
_easyocr_reader = None
|
|
|
|
|
|
def _redirect_stdout_to_stderr():
|
|
"""Redirect stdout to stderr so library print() calls don't corrupt the JSON protocol."""
|
|
real_stdout = sys.stdout
|
|
sys.stdout = sys.stderr
|
|
return real_stdout
|
|
|
|
|
|
def _restore_stdout(real_stdout):
|
|
sys.stdout = real_stdout
|
|
|
|
|
|
def get_easyocr():
|
|
global _easyocr_reader
|
|
if _easyocr_reader is None:
|
|
sys.stderr.write("Loading EasyOCR model...\n")
|
|
sys.stderr.flush()
|
|
# EasyOCR prints download progress to stdout — redirect during load
|
|
real_stdout = _redirect_stdout_to_stderr()
|
|
try:
|
|
import easyocr
|
|
_easyocr_reader = easyocr.Reader(["en"], gpu=True)
|
|
finally:
|
|
_restore_stdout(real_stdout)
|
|
sys.stderr.write("EasyOCR model loaded.\n")
|
|
sys.stderr.flush()
|
|
return _easyocr_reader
|
|
|
|
|
|
def bbox_to_rect(corners):
|
|
"""Convert 4-corner bbox [[x1,y1],[x2,y2],[x3,y3],[x4,y4]] to axis-aligned {x, y, width, height}."""
|
|
xs = [c[0] for c in corners]
|
|
ys = [c[1] for c in corners]
|
|
x = int(min(xs))
|
|
y = int(min(ys))
|
|
return x, y, int(max(xs)) - x, int(max(ys)) - y
|
|
|
|
|
|
def split_into_words(text, x, y, width, height):
|
|
"""Split a detection's text into individual words with proportional bounding boxes."""
|
|
parts = text.split()
|
|
if len(parts) <= 1:
|
|
return [{"text": text.strip(), "x": x, "y": y, "width": width, "height": height}]
|
|
|
|
total_chars = sum(len(p) for p in parts)
|
|
if total_chars == 0:
|
|
return [{"text": text.strip(), "x": x, "y": y, "width": width, "height": height}]
|
|
|
|
words = []
|
|
cx = x
|
|
for part in parts:
|
|
w = max(1, int(width * len(part) / total_chars))
|
|
words.append({"text": part, "x": cx, "y": y, "width": w, "height": height})
|
|
cx += w
|
|
return words
|
|
|
|
|
|
def merge_nearby_detections(items, merge_gap):
|
|
"""Merge adjacent detections on the same Y baseline when X gap < merge_gap.
|
|
|
|
items: list of {"text", "x", "y", "w", "h"}
|
|
Merge when: Y overlap > 50% of min height AND 0 <= X gap <= merge_gap.
|
|
"""
|
|
if not items or merge_gap <= 0:
|
|
return items
|
|
|
|
sorted_items = sorted(items, key=lambda d: (d["y"] + d["h"] / 2, d["x"]))
|
|
|
|
merged = [dict(sorted_items[0])]
|
|
for item in sorted_items[1:]:
|
|
last = merged[-1]
|
|
overlap = min(last["y"] + last["h"], item["y"] + item["h"]) - max(last["y"], item["y"])
|
|
min_h = min(last["h"], item["h"])
|
|
x_gap = item["x"] - (last["x"] + last["w"])
|
|
|
|
if min_h > 0 and overlap / min_h > 0.5 and 0 <= x_gap <= merge_gap:
|
|
new_x = min(last["x"], item["x"])
|
|
new_y = min(last["y"], item["y"])
|
|
new_x2 = max(last["x"] + last["w"], item["x"] + item["w"])
|
|
new_y2 = max(last["y"] + last["h"], item["y"] + item["h"])
|
|
last["x"] = new_x
|
|
last["y"] = new_y
|
|
last["w"] = new_x2 - new_x
|
|
last["h"] = new_y2 - new_y
|
|
last["text"] = last["text"] + " " + item["text"]
|
|
else:
|
|
merged.append(dict(item))
|
|
|
|
return merged
|
|
|
|
|
|
def items_to_response(items):
|
|
"""Convert list of {"text", "x", "y", "w", "h"} to OcrResponse format."""
|
|
lines = []
|
|
all_text_parts = []
|
|
for item in items:
|
|
words = split_into_words(item["text"], item["x"], item["y"], item["w"], item["h"])
|
|
lines.append({"text": item["text"], "words": words})
|
|
all_text_parts.append(item["text"])
|
|
return {"ok": True, "text": "\n".join(all_text_parts), "lines": lines}
|
|
|
|
|
|
def run_easyocr_array(img, merge_gap=0, **easyocr_kwargs):
|
|
reader = get_easyocr()
|
|
|
|
# Redirect stdout during inference — easyocr can print warnings
|
|
real_stdout = _redirect_stdout_to_stderr()
|
|
try:
|
|
results = reader.readtext(img, batch_size=32, **easyocr_kwargs)
|
|
finally:
|
|
_restore_stdout(real_stdout)
|
|
|
|
# results: [(bbox_4corners, text, conf), ...]
|
|
items = []
|
|
for bbox, text, conf in results:
|
|
if not text.strip():
|
|
continue
|
|
x, y, w, h = bbox_to_rect(bbox)
|
|
items.append({"text": text.strip(), "x": x, "y": y, "w": w, "h": h})
|
|
|
|
if merge_gap > 0:
|
|
items = merge_nearby_detections(items, merge_gap)
|
|
|
|
return items_to_response(items)
|
|
|
|
|
|
def load_image(req):
|
|
"""Load image from either imagePath (file) or imageBase64 (base64-encoded PNG)."""
|
|
from PIL import Image
|
|
import numpy as np
|
|
|
|
image_base64 = req.get("imageBase64")
|
|
if image_base64:
|
|
import base64
|
|
import io
|
|
img_bytes = base64.b64decode(image_base64)
|
|
return np.array(Image.open(io.BytesIO(img_bytes)))
|
|
|
|
image_path = req.get("imagePath")
|
|
if image_path:
|
|
return np.array(Image.open(image_path))
|
|
|
|
return None
|
|
|
|
|
|
def handle_request(req):
|
|
cmd = req.get("cmd")
|
|
if cmd != "ocr":
|
|
return {"ok": False, "error": f"Unknown command: {cmd}"}
|
|
|
|
img = load_image(req)
|
|
if img is None:
|
|
return {"ok": False, "error": "Missing imagePath or imageBase64"}
|
|
|
|
merge_gap = req.get("mergeGap", 0)
|
|
|
|
easyocr_kwargs = {}
|
|
for json_key, py_param in [
|
|
("linkThreshold", "link_threshold"),
|
|
("textThreshold", "text_threshold"),
|
|
("lowText", "low_text"),
|
|
("widthThs", "width_ths"),
|
|
("paragraph", "paragraph"),
|
|
]:
|
|
if json_key in req:
|
|
easyocr_kwargs[py_param] = req[json_key]
|
|
return run_easyocr_array(img, merge_gap=merge_gap, **easyocr_kwargs)
|
|
|
|
|
|
def main():
|
|
# Signal ready
|
|
sys.stdout.write(json.dumps({"ok": True, "ready": True}) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
for line in sys.stdin:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
req = json.loads(line)
|
|
resp = handle_request(req)
|
|
except Exception as e:
|
|
resp = {"ok": False, "error": str(e)}
|
|
sys.stdout.write(json.dumps(resp) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|