Skip to content

Instantly share code, notes, and snippets.

@4ft35t
Forked from ficapy/pdfaddtext.py
Created April 2, 2018 08:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 4ft35t/854768049d5c74ed22114604bae51c5f to your computer and use it in GitHub Desktop.
Save 4ft35t/854768049d5c74ed22114604bae51c5f to your computer and use it in GitHub Desktop.
网页版本使用(20页使用限制) https://ocr.ficapy.com
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: ficapy
import io
from urllib.parse import urlencode
from base64 import b64encode
from pdf2image import convert_from_bytes
from PyPDF2 import PdfFileWriter, PdfFileReader
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.pdfgen import canvas
from tornado.escape import json_decode
from tornado.httpclient import AsyncHTTPClient
from tornado.queues import Queue
from tornado.gen import coroutine
from tornado.ioloop import IOLoop
BAIDU_APPID = ""
BAIDU_APPKEY = ""
FONT_FILE = "Arial Unicode MS.ttf"
file_path = ""
queue = Queue(maxsize=50)
ret = {}
image_width_map = {}
def combine(file_path, pages, save_path="result.pdf"):
# unicode_ttf = path.join(path.dirname(path.abspath(__file__)), 'pdfaddtext/utils/Arial Unicode MS.ttf')
pdfmetrics.registerFont(TTFont('unicode', FONT_FILE))
output = PdfFileWriter()
with open(file_path, 'rb') as raw_pdf, open(save_path, "wb") as f:
existing_pdf = PdfFileReader(raw_pdf, strict=False)
print("该PDF文件总共{}页".format(existing_pdf.numPages))
for i in range(existing_pdf.numPages):
if not pages.get(i):
continue
data = pages[i]
page = existing_pdf.getPage(i)
rate = float(image_width_map[i]) / float(page.mediaBox.upperRight[0])
packet = io.BytesIO()
can = canvas.Canvas(packet, pagesize=page.mediaBox.upperRight)
can.setFillAlpha(0)
for ocr, x, y, height, width in data:
x, y, height = x / rate, y / rate, height / rate
font_size = int(height * 72 / 96)
can.setFont('unicode', size=font_size)
can.drawString(x, float(page.mediaBox.upperRight[1]) - y - height, ocr)
can.save()
packet.seek(0)
new_pdf = PdfFileReader(packet)
page.mergePage(new_pdf.getPage(0))
output.addPage(page)
output.write(f)
print("合并成功,文件地址为{}".format(save_path))
@coroutine
def extractor(file_path):
with open(file_path, 'rb') as raw_pdf:
inp = PdfFileReader(raw_pdf, strict=False)
for i in range(inp.numPages):
page = inp.getPage(i)
wrt = PdfFileWriter()
wrt.addPage(page)
r = io.BytesIO()
wrt.write(r)
images = convert_from_bytes(r.getvalue())[0]
image_width_map[i] = images.size[0]
ret = io.BytesIO()
images.save(ret, quality=20, optimize=True, format='jpeg')
yield queue.put([i, ret])
@coroutine
def get_token(_cache={}):
if _cache.get("access_token"):
return _cache.get("access_token")
url = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={}&client_secret={}".format(
BAIDU_APPID, BAIDU_APPKEY)
client = AsyncHTTPClient()
r = yield client.fetch(url)
req = dict(json_decode(r.body))
_cache.update(req)
return _cache.get('access_token')
@coroutine
def get_info(page_num, img):
# 通用文字识别(含位置信息版)
token = yield get_token()
URL = "https://aip.baidubce.com/rest/2.0/ocr/v1/general?access_token={}".format(token)
post_data = {'image': b64encode(img.getvalue())}
body = urlencode(post_data)
client = AsyncHTTPClient()
one_page = []
try:
r = yield client.fetch(URL, method="POST", body=body)
raw_json = json_decode(r.body.decode('utf-8'))
if int(raw_json.get("error_code", 1)) == 18:
yield queue.put([page_num, img])
if raw_json.get('words_result') is not None:
for i in raw_json["words_result"]:
one_page.append([i['words'], i['location']['left'], i['location']['top'], i['location']['height'],
i['location']['width']])
except Exception as e:
pass
ret[page_num] = one_page
@coroutine
def worker():
while 1:
page_num, img = yield queue.get()
yield get_info(page_num, img)
queue.task_done()
print("第{}页获取结果成功".format(page_num))
@coroutine
def run(file_path):
extractor(file_path)
worker()
worker()
yield queue.join()
IOLoop().instance().stop()
print("结果获取完毕,开始合并")
combine(file_path, ret)
if __name__ == "__main__":
run(file_path)
IOLoop().instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment