Skip to content

Instantly share code, notes, and snippets.

@dmgig
Last active December 3, 2020 11:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dmgig/6de6eed782ea527abfd1cd715f99ec5f to your computer and use it in GitHub Desktop.
Save dmgig/6de6eed782ea527abfd1cd715f99ec5f to your computer and use it in GitHub Desktop.
Multithreaded OCR Process with Tesseract, TEXTCLEANER, and imagemagick
#!/usr/bin/python
import os
import sys
import getopt
import subprocess
import time
import pytesseract
import argparse
import cv2
import shutil
import logging
import glob
import logging
import math
import calendar
from subprocess import Popen
from PIL import Image
# CONF
TESSCHUNK = 6
TEXTCHUNK = 3
CONVCHUNK = 3
LOGSDIR = '_logs'
IMGSDIR = '_imgs'
TEXTDIR = '_text'
DONEDIR = '_done'
documentCounter = 0;
timeStart = calendar.timegm(time.gmtime());
if not os.path.exists(DONEDIR):
os.mkdir(DONEDIR)
if not os.path.exists(LOGSDIR):
os.mkdir(LOGSDIR)
logging.basicConfig(filename=LOGSDIR+'/error.log',level=logging.DEBUG)
# FUNCS
# time conv https://stackoverflow.com/questions/4048651/python-function-to-convert-seconds-into-minutes-hours-and-days/4048773
def dhmsTime(q):
days = divmod(q, 86400)
# days[0] = whole days and
# days[1] = seconds remaining after those days
hours = divmod(days[1], 3600)
minutes = divmod(hours[1], 60)
return "%i days, %i hours, %i minutes, %i seconds" % (days[0], hours[0], minutes[0], minutes[1])
# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
# loop through files in dir
def listFiles(dir):
basedir = dir
print(PATH_DOCS+'/*.pdf')
for pdf in glob.glob(PATH_DOCS+'/*.pdf'):
if os.path.isfile(pdf):
print('file item=',pdf)
def prepFileDirs(dir,type):
basedir = dir
for file in glob.glob(PATH_DOCS+'/*.'+type):
if os.path.isfile(file):
fileName = os.path.basename(file)
dirName = os.path.dirname(file)
dirMain = dirName
dirImgs = dirMain+'/'+IMGSDIR
dirText = dirMain+'/'+TEXTDIR
dirDone = dirMain+'/'+DONEDIR
print(dirName,' ',dirMain,' ',dirImgs,' ',dirText)
if not os.path.exists(dirImgs):
os.mkdir(dirImgs)
if not os.path.exists(dirText):
os.mkdir(dirText)
if type == 'pdf':
prepPdf(file, dirName, dirMain, dirImgs, dirText, dirDone)
shutil.rmtree(dirImgs)
elif type == 'PNG':
prepPng(file, dirName, dirMain, dirImgs, dirText)
def appendToFilename(filePath, append):
dirName = os.path.dirname(filePath)+'/'
baseName = os.path.basename(filePath)
appendedName = dirName + os.path.splitext(baseName)[0] + append + os.path.splitext(baseName)[1]
return appendedName
def prepPdf(pdfPath, dirName, dirMain, imgPath, textPath, dirDone):
global documentCounter
global timeStart
timeStartDoc = calendar.timegm(time.gmtime());
print("\n")
print(pdfPath)
print(os.path.splitext(os.path.basename(pdfPath))[0])
print("\n\n")
# convert pdf to png
subprocess.call(['gs', '-dNOPAUSE', '-dBATCH', '-dTextAlphaBits=4', '-dGraphicsAlphaBits=4', '-sDEVICE=pnggray', '-r300x300', '-sCompression=none', '-sOutputFile='+imgPath+'/'+os.path.splitext(os.path.basename(pdfPath))[0]+'_%d.png', pdfPath ])
imgFiles = glob.glob(imgPath+'/*.png')
imgFilesCnt = len(imgFiles)
# remove gray backgrounds
imgFilesChunks = chunks(imgFiles, CONVCHUNK)
i=0
chunksSize = math.ceil(imgFilesCnt / CONVCHUNK);
for chunk in imgFilesChunks:
print("Chunk %d of %d" % (i+1,chunksSize))
ps = {}
for file in chunk:
baseName = os.path.basename(file)
p = subprocess.Popen(['convert', file, '-set', 'colorspace', 'gray', '-contrast-stretch', '4x80%', file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ps[p.pid] = p
print("Waiting for %d convert processes..." % len(ps))
while ps:
pid, status = os.wait()
if pid in ps:
del ps[pid]
print("Waiting for %d convert processes..." % len(ps))
i=i+1
# image enhancement/restoration
imgFiles = glob.glob(imgPath+'/*.png')
imgFilesChunks = chunks(imgFiles, TEXTCHUNK)
i=0
chunksSize = math.ceil(imgFilesCnt / TEXTCHUNK)
for chunk in imgFilesChunks:
print("Chunk %d of %d" % (i+1,chunksSize))
ps = {}
for file in chunk:
baseName = os.path.basename(file)
p = subprocess.Popen(['textcleaner', '-u', '-T', '-p', '20', file, file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ps[p.pid] = p
print("Waiting for %d textcleaner processes..." % len(ps))
while ps:
pid, status = os.wait()
if pid in ps:
del ps[pid]
print("Waiting for %d textcleaner processes..." % len(ps))
i=i+1
# tesseract (3 processes)
# https://stackoverflow.com/questions/3194018/wait-the-end-of-subprocesses-with-multiple-parallel-jobs
imgFiles = glob.glob(imgPath+'/*.png')
imgFilesChunks = chunks(imgFiles, TESSCHUNK)
i=0
chunksSize = math.ceil(imgFilesCnt / TESSCHUNK)
for chunk in imgFilesChunks:
print("Chunk %d of %d" % (i+1,chunksSize))
ps = {}
for file in chunk:
baseName = os.path.basename(file)
p = subprocess.Popen(['tesseract', file, textPath+'/'+os.path.splitext(baseName)[0], '-v', '-l', 'eng'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ps[p.pid] = p
print("Waiting for %d tesseract processes..." % len(ps))
while ps:
pid, status = os.wait()
if pid in ps:
del ps[pid]
print("Waiting for %d tesseract processes..." % len(ps))
i=i+1
# move finished pdf
shutil.move(pdfPath, dirDone+'/');
documentCounter = documentCounter + 1;
curTime = calendar.timegm(time.gmtime());
print(curTime)
runningTimeString = dhmsTime(curTime - timeStart)
print(runningTimeString)
documentTimeString = dhmsTime(curTime - timeStartDoc)
print(documentTimeString)
print("\n\nCompleted %d documents in %s" % (documentCounter, runningTimeString))
print("Last doc took %s\n\n" % documentTimeString)
# WORK
PATH_DOCSO = '.'
PATH_DOCS = '.'
listFiles(PATH_DOCS)
prepFileDirs(PATH_DOCS,'pdf')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment