Skip to content

Instantly share code, notes, and snippets.

@bertsky
Last active December 1, 2020 00:41
Show Gist options
  • Save bertsky/9cd1a6ed9c2f3925a5339ed52f9be458 to your computer and use it in GitHub Desktop.
Save bertsky/9cd1a6ed9c2f3925a5339ed52f9be458 to your computer and use it in GitHub Desktop.
proof of concept for an OCR-D workflow engine that uses strong (API instead CLI) integration of processors and acts as a server
import os
import sys
import re
import click
import json
from time import time
import flask
from distutils.spawn import find_executable as which
import ocrd
import ocrd.decorators
from ocrd_utils import getLogger, initLogging
from ocrd.processor.base import run_cli
from ocrd.resolver import Resolver
import ocrd.task_sequence
from ocrd_utils.logging import setOverrideLogLevel
# workaround venvs created for Python>=3.8
from pkg_resources import load_entry_point
from ocrd.decorators import ocrd_loglevel
# :::stolen from ocrd.cli.process:::
# ----------------------------------------------------------------------
# ocrd workflow
# ----------------------------------------------------------------------
@click.command('workflow')
@ocrd_loglevel
#@click.option('-m', '--mets', help="METS to process", default="mets.xml")
#@click.option('-g', '--page-id', help="ID(s) of the pages to process")
#@click.option('--overwrite', is_flag=True, default=False, help="Remove output pages/images if they already exist")
@click.option('-h', '--host', help="host name/IP to listen at", default='127.0.0.1')
@click.option('-p', '--port', help="TCP port to listen at", default=5000, type=click.IntRange(min=1024))
@click.argument('tasks', nargs=-1, required=True)
def workflow_cli(log_level, host, port, tasks):
"""
Start a server processing a series of tasks with strong Python integration
"""
initLogging()
log = getLogger('ocrd.workflow')
log.debug("Parsing and instantiating %d tasks", len(tasks))
tasks = prepare_tasks(tasks)
app = flask.Flask(__name__)
@app.route('/process')
def process():
if flask.request.args.get("mets"):
mets = flask.request.args["mets"]
else:
return 'Error: No METS'
if flask.request.args.get('page_id'):
page_id = flask.request.args["page_id"]
else:
page_id = ''
if flask.request.args.get('overwrite'):
overwrite = flask.request.args["overwrite"] in ["True", "true", "1"]
else:
overwrite = False
try:
run_tasks(mets, log_level, page_id, tasks, overwrite)
except Exception as e:
log.exception("Request '%s' failed", str(flask.request.args))
return 'Failed: %s' % str(e)
return 'Finished'
log.debug("Running server on http://%s:%d", host, port)
app.run(host=host, port=port)
# :::stolen from ocrd.task_sequence:::
def prepare_tasks(task_strs):
tasks = [ProcessorTask.parse(task_str) for task_str in task_strs]
instances = [task.instantiate() for task in tasks]
return tasks
def run_tasks(mets, log_level, page_id, tasks, overwrite=False):
log = getLogger('ocrd.task_sequence.run_tasks')
resolver = Resolver()
workspace = resolver.workspace_from_url(mets)
if overwrite:
workspace.overwrite_mode = True
if log_level:
setOverrideLogLevel(log_level)
ocrd.task_sequence.validate_tasks(tasks, workspace, page_id, overwrite)
# Run the tasks
is_first = True
last_is_instance = False
for task in tasks:
is_instance = bool(task.instance)
log.info("Start processing %s task '%s'",
"API" if is_instance else "CLI", task)
if not is_first:
if not last_is_instance:
workspace.reload_mets()
elif not is_instance:
workspace.save_mets()
if is_instance:
# execute API
returncode = run_api(
task.instance,
workspace,
page_id=page_id,
input_file_grp=','.join(task.input_file_grps),
output_file_grp=','.join(task.output_file_grps)
)
else:
# execute API
returncode = run_cli(
task.executable,
mets,
resolver,
workspace,
log_level=log_level,
page_id=page_id,
overwrite=overwrite,
input_file_grp=','.join(task.input_file_grps),
output_file_grp=','.join(task.output_file_grps),
parameter=json.dumps(task.parameters)
)
# check return code
if returncode != 0:
raise Exception("%s exited with non-zero return value %s." % (task.executable, returncode))
log.info("Finished processing task '%s'", task)
is_first = False
last_is_instance = is_instance
if last_is_instance:
workspace.save_mets()
processor_class = None
# amend ProcessorTask by object instance if possible
class ProcessorTask(ocrd.task_sequence.ProcessorTask):
def __init__(self, *args, **kwargs):
super(ProcessorTask, self).__init__(*args, **kwargs)
self.instance = None
# override class method to give us our objects
@classmethod
def parse(cls, argstr):
return super(cls, cls).parse(argstr)
def instantiate(self):
logger = getLogger('ocrd.task_sequence.ProcessorTask')
program = which(self.executable)
if not program:
logger.warning("Cannot find processor '%s' in PATH", self.executable)
return
# override modules
def ignore(anything):
return
global processor_class
processor_class = None
def get_processor_class(cls, **kwargs):
global processor_class
processor_class = cls
wrap_processor = ocrd.decorators.ocrd_cli_wrap_processor
ocrd.decorators.ocrd_cli_wrap_processor = get_processor_class
# run CLI merely to fetch class
with open(program) as f:
line = f.readline().strip()
if re.fullmatch('[#][!].*/python[0-9.]*', line):
code = compile(f.read(), program, 'exec')
sys_exit = sys.exit
sys.exit = ignore
sys_argv = sys.argv
sys.argv = [self.executable]
__name__ = '__main__'
exec(code)
sys.argv = sys_argv
sys.exit = sys_exit
logger.info("Instantiating %s for processor '%s'",
processor_class.__name__, self.executable)
self.instance = processor_class(None, parameter=self.parameters)
# circumvent calling CLI to get .ocrd_tool_json
self._ocrd_tool_json = self.instance.ocrd_tool
else:
logger.info("Non-Pythonic processor '%s' breaks the chain", self.executable)
# reset modules
ocrd.decorators.ocrd_cli_wrap_processor = wrap_processor
return bool(self.instance)
# :::partly stolen from ocrd.processor.run_processor:::
# :::partly stolen from ocrd.decorators.ocrd_cli_wrap_processor:::
def run_api(processor,
workspace,
page_id=None,
input_file_grp=None,
output_file_grp=None
): # pylint: disable=too-many-locals
"""
Set workspace in processor and run through it
Args:
processor (object): Processor instance
workspace (object): Workspace instance
"""
log = getLogger('ocrd.processor.helpers.run_api')
log.debug("Running processor %s", processor.__class__.__name__)
processor.page_id = page_id if page_id else None
processor.workspace = workspace
processor.input_file_grp = input_file_grp
processor.output_file_grp = output_file_grp
ocrd_tool = processor.ocrd_tool
parameter = processor.parameter
name = '%s v%s' % (ocrd_tool['executable'], processor.version)
otherrole = ocrd_tool['steps'][0]
logProfile = getLogger('ocrd.process.profile')
log.debug("Processor instance %s (%s doing %s)", processor, name, otherrole)
t0 = time()
try:
oldcwd = os.getcwd()
os.chdir(processor.workspace.directory)
processor.process()
except Exception:
log.exception("Failure in processor '%s'" % ocrd_tool['executable'])
return 1
finally:
os.chdir(oldcwd)
t1 = time() - t0
logProfile.info("Executing processor '%s' took %fs [--input-file-grp='%s' --output-file-grp='%s' --parameter='%s']" % (
ocrd_tool['executable'],
t1,
input_file_grp if input_file_grp else '',
output_file_grp if output_file_grp else '',
json.dumps(parameter) if parameter else {}
))
workspace.mets.add_agent(
name=name,
_type='OTHER',
othertype='SOFTWARE',
role='OTHER',
otherrole=otherrole
)
# we don't workspace.save_mets() here
return 0
if __name__ == '__main__':
workflow_cli()
@bertsky
Copy link
Author

bertsky commented Dec 1, 2020

Superseded by OCR-D/core#652

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment