Skip to content

Instantly share code, notes, and snippets.

@adelaide01
Created October 21, 2019 18:17
Show Gist options
  • Save adelaide01/cea6a3ecaf700cbe9422bed60f59603a to your computer and use it in GitHub Desktop.
Save adelaide01/cea6a3ecaf700cbe9422bed60f59603a to your computer and use it in GitHub Desktop.
batch processing in Python for Abbyy fine reader
#!/usr/bin/python
# Usage: process.py <input file> <output file> [-l <Language>] [-pdf|-txt|-rtf|-docx|-xml]
import argparse
import os
import time
from SimpleFolderProcess import *
processor = None
def setup_processor():
if "ABBYY_APPID" in os.environ:
processor.ApplicationId = os.environ["ABBYY_APPID"]
if "ABBYY_PWD" in os.environ:
processor.Password = os.environ["ABBYY_PWD"]
# Proxy settings
if "http_proxy" in os.environ:
proxy_string = os.environ["http_proxy"]
print("Using http proxy at {}".format(proxy_string))
processor.Proxies["http"] = proxy_string
if "https_proxy" in os.environ:
proxy_string = os.environ["https_proxy"]
print("Using https proxy at {}".format(proxy_string))
processor.Proxies["https"] = proxy_string
# Recognize a file at filePath and save result to resultFilePath
def recognize_file(file_path, result_file_path, language, output_format):
print("Uploading..")
settings = ProcessingSettings()
settings.Language = language
settings.OutputFormat = output_format
task = processor.process_image(file_path, settings)
if task is None:
print("Error")
return
if task.Status == "NotEnoughCredits":
print("Not enough credits to process the document. Please add more pages to your application's account.")
return
print("Id = {}".format(task.Id))
print("Status = {}".format(task.Status))
# Wait for the task to be completed
print("Waiting..")
# Note: it's recommended that your application waits at least 2 seconds
# before making the first getTaskStatus request and also between such requests
# for the same task. Making requests more often will not improve your
# application performance.
# Note: if your application queues several files and waits for them
# it's recommended that you use listFinishedTasks instead (which is described
# at http://ocrsdk.com/documentation/apireference/listFinishedTasks/).
while task.is_active():
time.sleep(5)
print(".")
task = processor.get_task_status(task)
print("Status = {}".format(task.Status))
if task.Status == "Completed":
if task.DownloadUrl is not None:
processor.download_result(task, result_file_path)
print("Result was written to {}".format(result_file_path))
else:
print("Error processing task")
def create_parser():
parser = argparse.ArgumentParser(description="Recognize a file via web service")
parser.add_argument('source_file')
parser.add_argument('target_file')
parser.add_argument('-l', '--language', default='English', help='Recognition language (default: %(default)s)')
group = parser.add_mutually_exclusive_group()
group.add_argument('-txt', action='store_const', const='txt', dest='format', default='txt')
group.add_argument('-pdf', action='store_const', const='pdfSearchable', dest='format')
group.add_argument('-rtf', action='store_const', const='rtf', dest='format')
group.add_argument('-docx', action='store_const', const='docx', dest='format')
group.add_argument('-xml', action='store_const', const='xml', dest='format')
return parser
def main():
global processor
processor = AbbyyOnlineSdk()
setup_processor()
args = create_parser().parse_args()
source_file = args.source_file
target_file = args.target_file
language = args.language
output_format = args.format
if os.path.isfile(source_file):
recognize_file(source_file, target_file, language, output_format)
else:
print("No such file: {}".format(source_file))
if __name__ == "__main__":
main()
#!/usr/bin/python
# Usage: process.py <input file> <output file> [-l <Language>] [-pdf|-txt|-rtf|-docx|-xml]
import shutil
import time
from os import listdir
from os.path import isfile, join
import xml.dom.minidom
try:
import requests
except ImportError:
print("You need the requests library to be installed in order to use this sample.")
print("Run 'pip install requests' to fix it.")
exit()
class ProcessingSettings:
Language = "English"
OutputFormat = "txt"
class Task:
Status = "Unknown"
Id = None
DownloadUrl = None
def is_active(self):
if self.Status == "InProgress" or self.Status == "Queued":
return True
else:
return False
class AbbyyOnlineSdk:
# Warning! This is for easier out-of-the box usage of the sample only. Change to https:// for production use
ServerUrl = "http://cloud.ocrsdk.com/"
# To create an application and obtain a password,
# register at http://cloud.ocrsdk.com/Account/Register
# More info on getting your application id and password at
# http://ocrsdk.com/documentation/faq/#faq3
ApplicationId = "testing3-2-1"
Password = "9BQgDQpuF0MFTMKIvSwVBgsx"
Proxies = {}
def process_image(self, file_path, settings):
url_params = {
"language": settings.Language,
"exportFormat": settings.OutputFormat
}
request_url = self.get_request_url("processImage")
with open(file_path, 'rb') as image_file:
image_data = image_file.read()
response = requests.post(request_url, data=image_data, params=url_params,
auth=(self.ApplicationId, self.Password), proxies=self.Proxies)
# Any response other than HTTP 200 means error - in this case exception will be thrown
response.raise_for_status()
# parse response xml and extract task ID
task = self.decode_response(response.text)
return task
def get_task_status(self, task):
if task.Id.find('00000000-0') != -1:
# GUID_NULL is being passed. This may be caused by a logical error in the calling code
print("Null task id passed")
return None
url_params = {"taskId": task.Id}
status_url = self.get_request_url("getTaskStatus")
response = requests.get(status_url, params=url_params,
auth=(self.ApplicationId, self.Password), proxies=self.Proxies)
task = self.decode_response(response.text)
return task
def download_result(self, task, output_path):
get_result_url = task.DownloadUrl
if get_result_url is None:
print("No download URL found")
return
file_response = requests.get(get_result_url, stream=True, proxies=self.Proxies)
with open(output_path, 'wb') as output_file:
shutil.copyfileobj(file_response.raw, output_file)
def decode_response(self, xml_response):
""" Decode xml response of the server. Return Task object """
dom = xml.dom.minidom.parseString(xml_response)
task_node = dom.getElementsByTagName("task")[0]
task = Task()
task.Id = task_node.getAttribute("id")
task.Status = task_node.getAttribute("status")
if task.Status == "Completed":
task.DownloadUrl = task_node.getAttribute("resultUrl")
return task
def get_request_url(self, url):
return self.ServerUrl.strip('/') + '/' + url.strip('/')
def processOneFile(aos_, source_, destination_):
print("input: " +source_)
print("sending image ...")
settings =ProcessingSettings()
task =aos_.process_image(source_, settings)
result =None
status =None
while not(status =="Completed"):
time.sleep( 5 )
print("checking...")
result =aos_.get_task_status( task )
status =result.Status
print("task status: " +status)
print("save output to: " +destination_)
aos_.download_result(result, destination_)
def processFiles(aos_, sourceFolder_, destinationFolder_):
"processes files from one folder and puts results to the other"
sourceFiles =[f for f in listdir( sourceFolder_ ) if isfile( join(sourceFolder_, f) )]
for sourceFile in sourceFiles:
fullSourcePath =join(sourceFolder_, sourceFile)
fullDestinationPath =join(destinationFolder_, sourceFile + "txt")
processOneFile(aos_, fullSourcePath, fullDestinationPath)
return
print("We start")
aos =AbbyyOnlineSdk()
processFiles(aos, "folder-in", "folder-out")
print("Done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment