Skip to content

Instantly share code, notes, and snippets.

@dmgig
Created September 21, 2021 01:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dmgig/bcfd171c95a8f7836cf4e3e74e27ded0 to your computer and use it in GitHub Desktop.
Save dmgig/bcfd171c95a8f7836cf4e3e74e27ded0 to your computer and use it in GitHub Desktop.
OCR for Document Search
#!/usr/bin/python3
import os
import sys
from datetime import datetime
import pytz
import logging
import ohhocr
import getopt
import argparse
estTZ = pytz.timezone('US/Eastern')
LOGSDIR = '_logs'
if not os.path.exists('./'+LOGSDIR):
os.mkdir('./'+LOGSDIR)
logging.basicConfig(filename='./'+LOGSDIR+'/'+datetime.now(estTZ).strftime("%Y%m%d%H%M%S")+'.log', level=logging.DEBUG)
# CONF
TESSCHUNK = 4
TEXTCHUNK = 6
CONVCHUNK = 6
SLEEPTIME = 0.05;
try:
opts, args = getopt.getopt(sys.argv[1:], "ht:s:", ["tesschunk=","sleeptime="])
except getopt.GetoptError:
print('python3 bin/ocr.py --sleeptime 0.05 --tesschunk 4')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print('python3 bin/ocr.py --sleeptime 0.05 --tesschunk 4')
sys.exit()
elif opt in ("-t", "--tesschunk"):
TESSCHUNK = arg
elif opt in ("-s", "--sleeptime"):
SLEEPTIME = arg
if sys.argv[1]:
PATH = sys.argv[1]
logging.info(PATH)
ohhocr.main(PATH, TESSCHUNK, SLEEPTIME)
else:
error = "No path provided."
logging.error(error)
print(error)
exit(1)
#!/usr/bin/python3
import os
import sys
import getopt
import subprocess
from datetime import datetime
import time
import pytesseract
import argparse
import cv2
import shutil
import logging
import glob
import logging
import math
import calendar
from subprocess import Popen
from PIL import Image
from time import sleep
import re
import pytz
from PyPDF2 import PdfFileReader
import pikepdf
import getopt
estTZ = pytz.timezone('US/Eastern')
documentsTotal = 0;
documentCounter = 0;
timeStart = calendar.timegm(time.gmtime());
# FUNCS
LOGSDIR = '_logs'
IMGSDIR = '_imgs'
TEXTDIR = '_text'
DONEDIR = '_done'
def setUpWorkingDirectory(dir):
if not os.path.exists(dir+'/'+IMGSDIR):
os.mkdir(dir+'/'+IMGSDIR)
if not os.path.exists(dir+'/'+TEXTDIR):
os.mkdir(dir+'/'+TEXTDIR)
if not os.path.exists(dir+'/'+DONEDIR):
os.mkdir(dir+'/'+DONEDIR)
def doDirectory(dir):
prepFileDirs(dir,'pdf')
def doSingleFile(file):
dir = os.path.dirname(file)
prepPdf(file, dir, dir+'/'+IMGSDIR, dir+'/'+TEXTDIR, dir+'/'+DONEDIR)
def moveEsFiles(dir):
print('moving dir '+dir)
es_dir = dir.replace('docs_originals/', 'es/')
if not os.path.exists(es_dir):
os.makedirs(es_dir, exist_ok=True)
os.system('rsync -pruv '+dir+'/'+TEXTDIR+'/ '+es_dir)
def moveDoneFiles(dir):
for doneDir in glob.glob(dir+'/**/_done', recursive=True):
print("------------------"+doneDir)
os.system('mv -v '+doneDir+'/* '+doneDir+'/../')
os.system('mv -v '+dir+'/_done/* '+dir+'/_done/../')
def cleanUp(dir):
shutil.rmtree(dir+'/_imgs')
shutil.rmtree(dir+'/_text')
shutil.rmtree(dir+'/_done')
for imgsDir in glob.glob(dir+'/**/_imgs', recursive=True):
shutil.rmtree(imgsDir)
for textDir in glob.glob(dir+'/**/_text', recursive=True):
shutil.rmtree(textDir)
for doneDir in glob.glob(dir+'/**/_done', recursive=True):
shutil.rmtree(doneDir)
# time conv https://stackoverflow.com/questions/4048651/python-function-to-convert-seconds-into-minutes-hours-and-days/4048773
def dhmsTime(q):
days = divmod(q, 86400)
# days[0] = whole days and
# days[1] = seconds remaining after those days
hours = divmod(days[1], 3600)
minutes = divmod(hours[1], 60)
return "%i days, %i hours, %i minutes, %i seconds" % (days[0], hours[0], minutes[0], minutes[1])
# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
# loop through files in dir
def listFiles(dir):
basedir = dir
print(dir)
print("=========================")
for pdf in glob.glob(dir+'/*.pdf'):
if os.path.isfile(pdf):
print(os.path.basename(pdf))
def prepFileDirs(dir,type):
global documentsTotal
basedir = dir
files = glob.glob(dir+'/*')
natural_sort(files)
documentsTotal = len(files)
for file in files:
if os.path.isfile(file):
fileName = os.path.basename(file)
dirName = os.path.dirname(file)
dirMain = dir
dirImgs = dir+'/'+IMGSDIR
dirText = dir+'/'+TEXTDIR
dirDone = dir+'/'+DONEDIR
prepPdf(file, dirName, dirImgs, dirText, dirDone)
else:
if(os.path.basename(file) not in ['_imgs', '_text', '_done']):
setUpWorkingDirectory(file)
doDirectory(file)
moveEsFiles(file)
def appendToFilename(filePath, append):
dirName = os.path.dirname(filePath)+'/'
baseName = os.path.basename(filePath)
appendedName = dirName + os.path.splitext(baseName)[0] + append + os.path.splitext(baseName)[1]
return appendedName
def prepPdf(pdfPath, dirName, imgPath, textPath, dirDone):
global documentsTotal
global documentCounter
global timeStart
global SLEEPTIME
global TESSCHUNK
print(makeLayout('=', len(pdfPath)))
print(pdfPath)
print(makeLayout('=', len(pdfPath)))
# convert pdf to png
# with open(pdfPath, "rb") as pdf_file:
# pdf = pikepdf.open(pdf_file)
# pdf.save(pdf)
# pdf_reader = PdfFileReader(pdf_file, False)
print(f"Ghostscript running...")
subprocess.call(['gs', '-dNOPAUSE', '-dBATCH', '-dTextAlphaBits=4', '-dGraphicsAlphaBits=4', '-sDEVICE=pnggray', '-r300x300', '-dNumRenderingThreads=4', '-dBufferSpace=2000000000', '-sCompression=none', '-sOutputFile='+imgPath+'/'+os.path.splitext(os.path.basename(pdfPath))[0]+'_%d.png', pdfPath ])
timeStartDoc = calendar.timegm(time.gmtime());
millitimeStart = time.time()
imgFiles = glob.glob(imgPath+'/*.png')
imgFilesCnt = len(imgFiles)
# remove gray backgrounds
# imgFilesChunks = chunks(imgFiles, CONVCHUNK)
# i=0
# chunksSize = math.ceil(imgFilesCnt / CONVCHUNK);
# for chunk in imgFilesChunks:
# print("Chunk %d of %d" % (i+1,chunksSize))
# ps = {}
# for file in chunk:
# baseName = os.path.basename(file)
# p = subprocess.Popen(['convert', file, '-set', 'colorspace', 'gray', '-contrast-stretch', '4x80%', file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# ps[p.pid] = p
# print("Waiting for %d convert processes..." % len(ps))
# while ps:
# pid, status = os.wait()
# if pid in ps:
# del ps[pid]
# print("Waiting for %d convert processes..." % len(ps))
# i=i+1
# image enhancement/restoration
# imgFiles = glob.glob(imgPath+'/*.png')
# imgFilesChunks = chunks(imgFiles, TEXTCHUNK)
# i=0
# chunksSize = math.ceil(imgFilesCnt / TEXTCHUNK)
# for chunk in imgFilesChunks:
# print("Chunk %d of %d" % (i+1,chunksSize))
# ps = {}
# for file in chunk:
# baseName = os.path.basename(file)
# # p = subprocess.Popen(['textcleaner', '-u', '-T', '-p', '20', file, file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# p = subprocess.Popen(['textcleaner', '-g', '-e', 'none', '-f', '10', '-o', '5', file, 'x'+file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# ps[p.pid] = p
# sleep(0.05)
# print("Waiting for %d textcleaner processes..." % len(ps))
# while ps:
# pid, status = os.wait()
# if pid in ps:
# del ps[pid]
# print("Waiting for %d textcleaner processes..." % len(ps))
# i=i+1
# tesseract (3 processes)
# https://stackoverflow.com/questions/3194018/wait-the-end-of-subprocesses-with-multiple-parallel-jobs
imgFiles = glob.glob(imgPath+'/*.png')
imgFilesChunks = chunks(imgFiles, TESSCHUNK)
i=0
chunksSize = math.ceil(imgFilesCnt / TESSCHUNK)
print(makeLayout('=', len(pdfPath)))
print(pdfPath)
print(makeLayout('=', len(pdfPath)))
for chunk in imgFilesChunks:
ps = {}
for file in chunk:
baseName = os.path.basename(file)
p = subprocess.Popen(['tesseract', file, textPath+'/'+os.path.splitext(baseName)[0], '-v', '-l', 'eng'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
ps[p.pid] = p
sleep(SLEEPTIME)
while ps:
pid, status = os.wait()
if pid in ps:
i = i+1
printProgressBar(i, len(imgFiles), prefix = 'Tesseract running, ', suffix = 'complete')
del ps[pid]
shutil.rmtree(dirName+'/'+IMGSDIR)
os.mkdir(dirName+'/'+IMGSDIR)
# move finished pdf
shutil.move(pdfPath, dirDone+'/');
documentCounter = documentCounter + 1;
curTime = calendar.timegm(time.gmtime());
print(datetime.now(estTZ).strftime("%Y-%m-%d %H:%M:%S"))
runningTimeString = dhmsTime(curTime - timeStart)
print(runningTimeString)
documentTimeString = dhmsTime(curTime - timeStartDoc)
print(documentTimeString)
if len(imgFiles):
logging.info(float(time.time() - millitimeStart) / len(imgFiles))
print("Completed %d documents in %s" % (documentCounter, runningTimeString))
print("Documents remaining: %s" % (documentsTotal - documentCounter))
print("Last doc took %s\n\n" % documentTimeString)
def natural_sort(l):
convert = lambda text: int(text) if text.isdigit() else text.lower()
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
return sorted(l, key=alphanum_key)
def makeLayout(char, n):
lay = ''
while n:
lay = lay + char
n = n - 1
return lay
# Print iterations progress
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd)
# Print New Line on Complete
if iteration == total:
print()
# WORK
TESSCHUNK=4
SLEEPTIME=0.05
def main(PATH, iTESSCHUNK, iSLEEPTIME):
TESSCHUNK=iTESSCHUNK
SLEEPTIME=iSLEEPTIME
if os.path.exists(PATH):
if os.path.isdir(PATH):
WORKINGDIR = PATH
setUpWorkingDirectory(WORKINGDIR)
listFiles(WORKINGDIR)
doDirectory(WORKINGDIR)
else:
WORKINGDIR = os.path.dirname(PATH)
setUpWorkingDirectory(WORKINGDIR)
doSingleFile(PATH)
moveDoneFiles(WORKINGDIR)
cleanUp(WORKINGDIR)
else:
error = "Path does not exist."
logging.error(error)
print(error)
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment