Skip to content

Instantly share code, notes, and snippets.

@turtlemonvh
Last active May 1, 2020 12:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save turtlemonvh/4558b8bc4377b6758e289316c0141d15 to your computer and use it in GitHub Desktop.
Save turtlemonvh/4558b8bc4377b6758e289316c0141d15 to your computer and use it in GitHub Desktop.
Python script to run arbitrary python code to filter or transform lines in bash pipes.

Pype

A simple python utility for filtering and transforming lines in bash pipes.

I created this because jq syntax is a bit hard for me to remember and awk can be annoying for JSON and other non-trivial formats.

You just write simple inline python and either return a string (to transform) or a truthy value (to filter). The line being evaluated is available as the variable "line", and the line number is available as "nline".

Example use

# Print out the "id" field of every JSON object in the file where the "enabled" field is set to a truthy value
cat my-file-full-of-json.txt \
  | pype -a filter -i "json" "json.loads(line).get('enabled')" \
  | pype -i "json" "json.loads(line).get('id')"

# Sleep a random amount of time for each line and print out the time slept next to the line number
# Run with 3 processes
cat my-file-full-of-json.txt \
  | pype -i "time;random" "sleep_duration = random.random()*3; time.sleep(sleep_duration); ret=(nline, sleep_duration)" -P 3

Installation

curl https://gist.githubusercontent.com/turtlemonvh/4558b8bc4377b6758e289316c0141d15/raw/98eb95ecbadd067fff2cc7b18fb6ef84d9e61147/pype.py -o pype
chmod +x pype
mv pype /usr/local/bin/

Confirm setup.

$ pype -h
usage: pype [-h] [-i IMPORTS] [-a {transform,filter}] [-ns] [-P PARALLELISM]
            cmd

Tool for filtering and transforming data in bash pipes using python.

positional arguments:
  cmd                   Python code to run. Should return a string in the case
                        of 'transform' or a truthy object in the case of
                        'filter'. The current line and current line number are
                        available as the variables 'line' and 'nline',
                        respectively. Responses from multi-statement code are
                        supported via setting the variable 'ret'. Single
                        statement code does not need to explicitly set 'ret'.

optional arguments:
  -h, --help            show this help message and exit
  -i IMPORTS, --imports IMPORTS
                        Imports to add, ; separated.
  -a {transform,filter}, --action {transform,filter}
                        Type of action to take in lines. Either filter or
                        transform.
  -ns, --strip-trailing-newlines
                        Set this flag to strip trailing newlines on each line.
                        Only relevant when calling 'transform'.
  -P PARALLELISM, --parallelism PARALLELISM
                        The number of processes to spin up to process results.
                        Similar to 'xargs -P' flag. Transformed or filtered
                        lines are still returned in order.
#!/usr/bin/env python
import sys
import argparse
import multiprocessing
import itertools
## PY2 vs PY3
if (sys.version_info > (3, 0)):
# Python 3
zipper = zip
else:
# Python 2
zipper = itertools.izip
## Transform and filter
def is_multi_statement(code):
if "ret=" in code or "ret =" in code:
return True
return False
def transform_line(nline, line, code, strip_trailing_newlines):
"""
Run the transformer on a single line
"""
ret = None
if is_multi_statement(code):
exec(code)
else:
ret = eval(code)
ret = str(ret)
if not strip_trailing_newlines:
ret = ret + "\n"
return ret
def transform_line_wrapper(wrapped_args):
try:
(nline, line), code, strip_trailing_newlines = wrapped_args
return transform_line(nline, line, code, strip_trailing_newlines)
except KeyboardInterrupt:
# Wait to be killed
pass
def filter_line(nline, line, code):
ret = None
if is_multi_statement(code):
exec(code)
else:
ret = eval(code)
if ret:
sys.stdout.write(line)
def filter_line_wrapper(wrapped_args):
try:
(nline, line), code = wrapped_args
return filter_line(nline, line, code)
except KeyboardInterrupt:
# Wait to be killed
pass
def transform_lines_async(lines, code, strip_trailing_newlines, parallelism=1):
"""
Transform each line using bounded async evaluation, still printing results in order.
"""
pool = multiprocessing.Pool(parallelism)
wrapped_args = zipper(enumerate(lines), itertools.repeat(code), itertools.repeat(strip_trailing_newlines))
try:
for r in pool.imap(transform_line_wrapper, wrapped_args, 1):
sys.stdout.write(r)
except KeyboardInterrupt:
pool.terminate()
def transform_lines(lines, code, strip_trailing_newlines):
"""
Filter lines by the output of eval(code).
The contents of the line is available as "line".
"""
for nline, line in enumerate(lines):
r = transform_line(nline, line, code, strip_trailing_newlines)
sys.stdout.write(r)
def filter_lines_async(lines, code, parallelism=1):
"""
Filter each line using bounded async evaluation, still printing results in order.
"""
pool = multiprocessing.Pool(parallelism)
wrapped_args = zipper(enumerate(lines), itertools.repeat(code))
try:
for r in pool.imap(filter_line_wrapper, wrapped_args, 1):
sys.stdout.write(r)
except KeyboardInterrupt:
pool.terminate()
def filter_lines(lines, code):
"""
Filter lines by the output of eval(code).
The contents of the line is available as "line".
"""
for nline, line in enumerate(lines):
filter_line(nline, line, code)
## Argument parsing
def get_parser():
p = argparse.ArgumentParser(description="Tool for filtering and transforming data in bash pipes using python.")
p.add_argument(
"-i", "--imports",
help="Imports to add, ; separated."
)
p.add_argument(
"-a", "--action",
action="store",
default="transform",
choices=["transform", "filter"],
help="Type of action to take in lines. Either filter or transform."
)
p.add_argument(
"-ns", "--strip-trailing-newlines",
action="store_true",
dest="strip_trailing_newlines",
help="Set this flag to strip trailing newlines on each line. Only relevant when calling 'transform'."
)
p.add_argument(
"-P", "--parallelism",
action="store",
type=int,
default=None,
dest="parallelism",
help="The number of processes to spin up to process results. Similar to 'xargs -P' flag. Transformed or filtered lines are still returned in order."
)
cmd_help = """
Python code to run.
Should return a string in the case of 'transform' or a truthy object in the case of 'filter'.
The current line and current line number are available as the variables 'line' and 'nline', respectively.
Responses from multi-statement code are supported via setting the variable 'ret'. Single statement code does not need to explicitly set 'ret'.
"""
p.add_argument("cmd", help=cmd_help)
return p
if __name__ == "__main__":
options = get_parser().parse_args()
# Import
if options.imports is not None:
for lib in options.imports.split(";"):
exec("import {}".format(lib))
# Lines as a generator
lines = (line for line in sys.stdin)
# Run action per each line
if options.action == "transform":
if options.parallelism is not None:
transform_lines_async(lines, options.cmd, options.strip_trailing_newlines, parallelism=options.parallelism)
else:
transform_lines(lines, options.cmd, options.strip_trailing_newlines)
elif options.action == "filter":
if options.parallelism is not None:
filter_lines_async(lines, options.cmd, parallelism=options.parallelism)
else:
filter_lines(lines, options.cmd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment