Skip to content

Instantly share code, notes, and snippets.

@kylebgorman
Created March 19, 2022 19:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kylebgorman/3db1265bcae35261a8cd2f6bc7afed01 to your computer and use it in GitHub Desktop.
Save kylebgorman/3db1265bcae35261a8cd2f6bc7afed01 to your computer and use it in GitHub Desktop.
Parallel processing of .xz CoNLL-U files with udpipe
#!/usr/bin/env python
"""Parallelizes UDPipe processing.
This requires `udpipe`, `xz`, and `xzcat` in users' $PATH.
"""
import argparse
import glob
import multiprocessing
import itertools
import subprocess
from typing import List
def process(arguments: List[str], model: str, path: str, use_xz: bool) -> None:
# Initial setup.
call_args = ["udpipe", "--immediate"]
call_args.extend(arguments)
call_args.append(model)
# Processes xz inputs and save the output in xz format.
if use_xz:
xzcat = subprocess.Popen(["xzcat", path], stdout=subprocess.PIPE)
udpipe = subprocess.Popen(
call_args,
stdin=xzcat.stdout,
stdout=subprocess.PIPE,
)
with open(f"{path}.conllu.xz", "w") as sink:
xz = subprocess.Popen(
"xz",
stdin=udpipe.stdout,
stdout=sink,
)
xz.communicate()
else:
outpath = f"--outpath={path}.conllu"
call_args.append(outpath)
call_args.append(path)
subprocess.check_call(call_args)
def main(args: argparse.Namespace) -> None:
tasks = zip(
itertools.repeat(args.arg),
itertools.repeat(args.model),
glob.iglob(args.path_dir),
itertools.repeat(args.xz),
)
with multiprocessing.Pool(args.processes) as pool:
if args.chunksize:
pool.starmap(process, tasks, chunksize=args.chunksize)
else:
pool.starmap(process, tasks)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--arg",
help="arguments to pass to UDPipe",
action="append",
)
parser.add_argument(
"--model",
required=True,
help="path to UDPipe model",
)
parser.add_argument(
"--path-dir",
required=True,
help="directory where paths are relative to",
)
parser.add_argument(
"--processes",
type=int,
default=multiprocessing.cpu_count(),
help="max number of workers to use",
)
parser.add_argument(
"--chunksize",
type=int,
help="number of chunks",
)
parser.add_argument("--xz", help="enables xz I/O", action="store_true")
main(parser.parse_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment