-
-
Save 4ft35t/854768049d5c74ed22114604bae51c5f to your computer and use it in GitHub Desktop.
网页版本使用(20页使用限制) https://ocr.ficapy.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Author: ficapy | |
import io | |
from urllib.parse import urlencode | |
from base64 import b64encode | |
from pdf2image import convert_from_bytes | |
from PyPDF2 import PdfFileWriter, PdfFileReader | |
from reportlab.pdfbase import pdfmetrics | |
from reportlab.pdfbase.ttfonts import TTFont | |
from reportlab.pdfgen import canvas | |
from tornado.escape import json_decode | |
from tornado.httpclient import AsyncHTTPClient | |
from tornado.queues import Queue | |
from tornado.gen import coroutine | |
from tornado.ioloop import IOLoop | |
BAIDU_APPID = "" | |
BAIDU_APPKEY = "" | |
FONT_FILE = "Arial Unicode MS.ttf" | |
file_path = "" | |
queue = Queue(maxsize=50) | |
ret = {} | |
image_width_map = {} | |
def combine(file_path, pages, save_path="result.pdf"): | |
# unicode_ttf = path.join(path.dirname(path.abspath(__file__)), 'pdfaddtext/utils/Arial Unicode MS.ttf') | |
pdfmetrics.registerFont(TTFont('unicode', FONT_FILE)) | |
output = PdfFileWriter() | |
with open(file_path, 'rb') as raw_pdf, open(save_path, "wb") as f: | |
existing_pdf = PdfFileReader(raw_pdf, strict=False) | |
print("该PDF文件总共{}页".format(existing_pdf.numPages)) | |
for i in range(existing_pdf.numPages): | |
if not pages.get(i): | |
continue | |
data = pages[i] | |
page = existing_pdf.getPage(i) | |
rate = float(image_width_map[i]) / float(page.mediaBox.upperRight[0]) | |
packet = io.BytesIO() | |
can = canvas.Canvas(packet, pagesize=page.mediaBox.upperRight) | |
can.setFillAlpha(0) | |
for ocr, x, y, height, width in data: | |
x, y, height = x / rate, y / rate, height / rate | |
font_size = int(height * 72 / 96) | |
can.setFont('unicode', size=font_size) | |
can.drawString(x, float(page.mediaBox.upperRight[1]) - y - height, ocr) | |
can.save() | |
packet.seek(0) | |
new_pdf = PdfFileReader(packet) | |
page.mergePage(new_pdf.getPage(0)) | |
output.addPage(page) | |
output.write(f) | |
print("合并成功,文件地址为{}".format(save_path)) | |
@coroutine | |
def extractor(file_path): | |
with open(file_path, 'rb') as raw_pdf: | |
inp = PdfFileReader(raw_pdf, strict=False) | |
for i in range(inp.numPages): | |
page = inp.getPage(i) | |
wrt = PdfFileWriter() | |
wrt.addPage(page) | |
r = io.BytesIO() | |
wrt.write(r) | |
images = convert_from_bytes(r.getvalue())[0] | |
image_width_map[i] = images.size[0] | |
ret = io.BytesIO() | |
images.save(ret, quality=20, optimize=True, format='jpeg') | |
yield queue.put([i, ret]) | |
@coroutine | |
def get_token(_cache={}): | |
if _cache.get("access_token"): | |
return _cache.get("access_token") | |
url = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={}&client_secret={}".format( | |
BAIDU_APPID, BAIDU_APPKEY) | |
client = AsyncHTTPClient() | |
r = yield client.fetch(url) | |
req = dict(json_decode(r.body)) | |
_cache.update(req) | |
return _cache.get('access_token') | |
@coroutine | |
def get_info(page_num, img): | |
# 通用文字识别(含位置信息版) | |
token = yield get_token() | |
URL = "https://aip.baidubce.com/rest/2.0/ocr/v1/general?access_token={}".format(token) | |
post_data = {'image': b64encode(img.getvalue())} | |
body = urlencode(post_data) | |
client = AsyncHTTPClient() | |
one_page = [] | |
try: | |
r = yield client.fetch(URL, method="POST", body=body) | |
raw_json = json_decode(r.body.decode('utf-8')) | |
if int(raw_json.get("error_code", 1)) == 18: | |
yield queue.put([page_num, img]) | |
if raw_json.get('words_result') is not None: | |
for i in raw_json["words_result"]: | |
one_page.append([i['words'], i['location']['left'], i['location']['top'], i['location']['height'], | |
i['location']['width']]) | |
except Exception as e: | |
pass | |
ret[page_num] = one_page | |
@coroutine | |
def worker(): | |
while 1: | |
page_num, img = yield queue.get() | |
yield get_info(page_num, img) | |
queue.task_done() | |
print("第{}页获取结果成功".format(page_num)) | |
@coroutine | |
def run(file_path): | |
extractor(file_path) | |
worker() | |
worker() | |
yield queue.join() | |
IOLoop().instance().stop() | |
print("结果获取完毕,开始合并") | |
combine(file_path, ret) | |
if __name__ == "__main__": | |
run(file_path) | |
IOLoop().instance().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment