Skip to content

Instantly share code, notes, and snippets.

@bnorick
Last active December 8, 2023 10:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bnorick/44af7676f8e1fb853e50afab0aa5f292 to your computer and use it in GitHub Desktop.
Save bnorick/44af7676f8e1fb853e50afab0aa5f292 to your computer and use it in GitHub Desktop.
Pythoni: a Python REPL you can pipe data into for processing
#!/usr/bin/env python3
import argparse
import code
import functools
import inspect
import logging
import os
import pathlib
import shlex
import stat
import sys
import traceback
logger = logging.getLogger(__name__)
class ParserExited(Exception):
pass
class CommandError(Exception):
pass
class ArgumentParser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def print_help(self, *args, **kwargs):
logger.debug(f"print_help prog=%s args=%s kwargs=%s", self.prog, args, kwargs)
super().print_help(file=sys.stderr)
def exit(self, *args, **kwargs):
logger.debug("exit prog=%s", self.prog)
raise ParserExited
def _argparse_export(pythoni, args):
logger.debug("%s", args)
if args.indices is None:
args.indices = []
pythoni.export(args.path, *args.indices, as_python=args.as_python, input_command=args.input_command, overwrite=args.overwrite, all=args.all)
def _argparse_help(pythoni, args):
pythoni.command_parser.print_help(file=sys.stderr)
def _int_or_range(str):
try:
start, end = str.split("-")
return range(int(start), int(end) + 1)
except ValueError:
try:
val = int(str)
return range(val, val + 1)
except ValueError:
raise argparse.ArgumentError(f"indices must be integers or (inclusive) integer ranges, e.g., N-M")
class Pythoni(code.InteractiveConsole):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._history = []
# NOTE: requires python 3.9
self.command_parser = ArgumentParser(
usage="%%command ...",
description="Use commands prefixed with % to perform magic!\nFor help with a specific command, use %command --help.",
exit_on_error=False,
add_help=False,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
subparsers = self.command_parser.add_subparsers(title="command", help=argparse.SUPPRESS)
export = subparsers.add_parser("export", prog="%export", exit_on_error=False)
export.add_argument("path", type=pathlib.Path)
export.add_argument("indices", type=_int_or_range, nargs="*", help="indices from history to export, integers or (inclusive) integer ranges, e.g., N-M")
export.add_argument("-p", "--as-python", action="store_true")
export.add_argument("-i", "--input-command", type=str, required=False, help="command to run which is piped into script")
export.add_argument("-o", "--overwrite", action="store_true")
export.add_argument("-a", "--all", action="store_true")
export.set_defaults(func=functools.partial(_argparse_export, self))
help = subparsers.add_parser("help", prog="%help", exit_on_error=False)
help.set_defaults(func=functools.partial(_argparse_help, self))
self.command_parser.epilog = "Available commands:\n " + "\n ".join(subparsers.choices.keys())
sys.ps1 = "[0] >>> "
def runsource(self, source, filename="<input>", symbol="single"):
logger.debug("runsource %s", source)
if source.startswith("%"):
try:
# self.command_parser._reset()
args = self.command_parser.parse_args(shlex.split(source[1:].strip()))
args.func(args)
except ParserExited:
pass
except (CommandError, argparse.ArgumentError) as e:
self.error(str(e))
except Exception as e:
self.error(str(e))
traceback.print_exc(file=sys.stderr)
return False
else:
result = super().runsource(source, filename=filename, symbol=symbol)
if not result:
self._history.append("\n".join(self.buffer))
sys.ps1 = f"[{len(self._history)}] >>> "
sys.ps2 = " " * sys.ps1.index(">") + "... "
return result
def _get_export_code(self, *indices, all=False):
if indices and all:
self.error(f"Both indices and all=True passed, only use one or the other.")
return
code = []
if all:
code.extend(self._history)
else:
try:
for index_range in indices:
for index in index_range:
code.append(self._history[index])
except IndexError as e:
raise CommandError(f"Nonexistent history index requested, {index=}")
return "\n\n".join(code)
def export(self, path, *indices, as_python=False, input_command=None, overwrite=False, all=False):
path = pathlib.Path(path)
if path.exists() and not overwrite:
self.error(f"path exists, pass overwrite=True to overwrite: {path}")
return
code = self._get_export_code(*indices, all=all)
if as_python:
with path.open("w", encoding="utf8") as f:
f.write(code)
print(f"Exported python script to {path}")
else:
if 'stdin' in code:
code = "import sys\nstdin = sys.stdin.readlines()\n\n" + code
if input_command:
script = f'#!/usr/bin/env bash\nset -Eeuo pipefail\ntrap exit SIGINT SIGTERM ERR EXIT\nCODE={shlex.quote(code)}\n{input_command} | python -c "$CODE"'
else:
script = f'#!/usr/bin/env bash\nset -Eeuo pipefail\ntrap exit SIGINT SIGTERM ERR EXIT\nCODE={shlex.quote(code)}\npython -c "$CODE"'
with path.open("w", encoding="utf8") as f:
f.write(script)
path.chmod(mode=path.stat().st_mode|stat.S_IRWXU)
print(f"Exported executable bash script to {path}")
def error(self, message):
print(f"ERROR: {message}", file=sys.stderr)
parser = argparse.ArgumentParser("pythoni")
group = parser.add_mutually_exclusive_group()
group.add_argument("-c", "--code", help="code to execute, may use \"stdin\" local which is a list of lines read from stdin")
group.add_argument("-p", "--print", action="append", nargs="+", help="-p '[CODE]' is a shortcut to execute print([CODE])")
group.add_argument("-pf", "--print-fstring", action="append", nargs="+", help="-pf '[INPUT]' is a shortcut to execute print(f\'[INPUT]\'), no automatic escaping")
group.add_argument("-l", "--lambda", help="-l 'lambda line: line.replace(\"foobar\", \"\")' is a shortcut to execute the lambda func for each line and print results which evaluate to True, i.e., returning None will hide a line", dest="lambda_")
parser.add_argument("-d", "--debug", action="store_true", help="print debug informat useful when developing pythoni")
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(funcName)s [%(lineno)d] - %(message)s")
redirected_out = not sys.stdout.isatty()
redirected_in = not sys.stdin.isatty()
if args.print:
args.print = [item for items in args.print for item in items]
args.code = "; ".join(f"print({p})" for p in args.print)
elif args.print_fstring:
args.print_fstring = [item for items in args.print_fstring for item in items]
args.code = "; ".join(f"print(f\'{p}\')" for p in args.print_fstring)
elif args.lambda_:
try:
if not args.lambda_.strip().startswith("lambda"):
raise SyntaxError
result = eval(args.lambda_)
if not callable(result):
raise SyntaxError
sig = inspect.signature(result)
if len(sig.parameters) != 1:
raise RuntimeError(f"invalid --lambda, function must accept a single argument \"{args.lambda_}\" does not")
except Exception as e:
if isinstance(e, SyntaxError):
message = f"invalid --lambda, must be a valid lambda function but \"{args.lambda_}\" is not"
else:
message = str(e)
print(f"ERROR: {message}", file=sys.stderr)
sys.exit(1)
args.code = f"""\
fn = {args.lambda_}
for line in stdin:
result = fn(line)
if result:
print(result, end='')
"""
if redirected_in:
stdin = sys.stdin.readlines()
if not args.code:
print('stdin read to "stdin" variable\n')
if args.code:
exec(args.code)
elif not redirected_out:
# ref: https://github.com/python/cpython/issues/36029#issuecomment-1093968541
_stdin = os.dup(0)
os.close(0)
tty = os.open("/dev/tty", os.O_RDONLY)
assert tty == 0
import readline
import rlcompleter
variables = globals().copy()
variables.update(locals())
readline.set_completer(rlcompleter.Completer(variables).complete)
readline.parse_and_bind("tab: complete")
pythoni = Pythoni(variables)
variables.update(pythoni=pythoni)
pythoni.interact()
@bnorick
Copy link
Author

bnorick commented Feb 16, 2023

For this version, I added some magic (like IPython) to enable an export capability, %export.

Say you start with your | long | piped | sequence | pythoni and then interactively process the piped input. Now, you can export your interactive session to either a python script or a fully contained bash script.

You can do the following:
%export process.py --all
to enable
your | long | piped | sequence | python process.py

Even better, especially for repetitive tasks, is the shell script export, e.g.,
%export go.sh --command 'your | long | piped | sequence' --all
and then simply running with ./go.sh.

As a contrived example,

$ ls | pythoni
stdin read to "stdin" variable
Python 3.10.9 (main, Feb  6 2023, 16:21:20) [GCC 11.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(Pythoni)
[0] >>> for line in stdin:
    ...   if line.startswith('s'):
    ...     print(line.strip())
    ... 
scratch
scripts
src
[1] >>> %export ls-s --command ls --all
Exported executable bash script to ls-s
[1] >>> exit()
$ ./ls-s 
scratch
scripts
src

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment