Skip to content

Instantly share code, notes, and snippets.

@lahwran
Created August 19, 2019 22:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lahwran/5d3f2de82ba2dde3f4dcac3c07a054b5 to your computer and use it in GitHub Desktop.
Save lahwran/5d3f2de82ba2dde3f4dcac3c07a054b5 to your computer and use it in GitHub Desktop.
# ARGPARSE_WRAP: simple form of command registration for argparse
# Open sourced from Vast.ai internal tools
# MIT license
import argparse
import sys
import os
class argument(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
REMAINDER=argparse.REMAINDER
class apwrap(object):
REMAINDER=argparse.REMAINDER
def __init__(self, *args, **kwargs):
self.parser = argparse.ArgumentParser(*args, **kwargs)
self.parser.set_defaults(func=self.fail_with_help)
self.subparsers_ = None
self.added_help_cmd = False
self.post_setup = []
def fail_with_help(self, *a, **kw):
self.parser.print_help(sys.stderr)
raise SystemExit
def add_argument(self, *a, **kw):
return self.parser.add_argument(*a, **kw)
def subparsers(self, *a, **kw):
if self.subparsers_ is None:
kw["metavar"] = "command"
kw["help"] = "command to run. one of:"
self.subparsers_ = self.parser.add_subparsers(*a, **kw)
return self.subparsers_
def command(self, *arguments, aliases=(), help=None):
help_ = help
if not self.added_help_cmd:
self.added_help_cmd = True
@self.command(argument("subcommand", default=None, nargs="?"), help="print this help message")
def help(*a, **kw):
self.fail_with_help()
def inner(func):
name = func.__name__.replace("_", "-")
sp = self.subparsers().add_parser(name, aliases=aliases, help=help_)
for arg in arguments:
sp.add_argument(*arg.args, **arg.kwargs)
sp.set_defaults(func=func)
return func
if len(arguments) == 1 and type(arguments[0]) != argument:
func = arguments[0]
arguments = []
return inner(func)
return inner
def parse_args(self, *a, **kw):
args = self.parser.parse_args(*a, **kw)
for func in self.post_setup:
func(args)
return args
blessed
watchdog
psutil
# process restarter, for development.
# especially relevant for deep process trees,
# see reap_children().
# Open sourced from Vast.ai internal tools
# MIT license
# examples:
# python watch.py watch 'src/**.jsx' 'src/**.scss' --command yarn build
# python watch.py autorestart 'src/**.jsx' 'src/**.scss' --command yarn run
# python watch.py watch 'src/**.cpp' 'src/**.h' --command make all
# python watch.py autorestart 'src/**.cpp' 'src/**.h' --command bash -c 'make all; ./target'
from argparse_wrap import apwrap, argument
import textwrap
import sys
import collections
import shlex
import subprocess
import signal
import os
import time
import blessed
from watchdog.observers import Observer
from watchdog.tricks import Trick
import psutil
import argparse
term = blessed.Terminal()
apw = apwrap()
os.environ.pop("MAKEFLAGS", None)
os.environ.pop("MAKELEVEL", None)
os.environ.pop("MAKE_TERMERR", None)
os.environ.pop("MAKE_TERMOUT", None)
os.environ.pop("MFLAGS", None)
os.environ["_"] = "watch"
quiet = False
from watchdog.events import FileSystemEvent
real_init = FileSystemEvent.__init__
def event_init(self, *a, **kw):
real_init(self, *a, **kw)
self.when = time.time()
FileSystemEvent.__init__ = event_init
def print_(*a, **kw):
if quiet:
return
print(*a, **kw)
def handle_sigterm(_signum, _frame):
raise KeyboardInterrupt()
signal.signal(signal.SIGTERM, handle_sigterm)
class CommandTrick(Trick):
"""Executes shell commands in response to matched events."""
def __init__(self, cmd, patterns=None, ignore_patterns=None,
ignore_directories=False):
super(CommandTrick, self).__init__(patterns, ignore_patterns,
ignore_directories)
self.process = None
self.command = cmd
self.on_any_event(None)
def on_any_event(self, event):
time.sleep(0.1)
subprocess.call(self.command)
procname_cache = {}
class ProcName:
def __init__(self, p):
self.depth = 0
if p is None:
self.name = "[missing]"
self.cmdline = ["[missing]"]
else:
try:
self.name = p.name()
except psutil.Error:
self.name = "[error]"
try:
self.cmdline = p.cmdline()
except psutil.Error:
self.cmdline = ["[error]"]
self.cmdline_fmt = " ".join(shlex.quote(x) for x in self.cmdline)
self.trimmed = self.cmdline_fmt
if len(self.trimmed) > 40:
self.trimmed = self.trimmed[:36] + " ..."
procname_missing = ProcName(None)
def format_process(p, pid=None, indent=">", box=True):
if p is None and pid is None:
return "[<< totally missing wtf >>]"
if pid is None:
pid = p.pid
procname = procname_cache.get(pid, procname_missing)
if p is None:
status = "missing"
else:
try:
status = p.status()
except psutil.NoSuchProcess:
status = "gone"
except psutil.ZombieProcess:
status = "zombie"
except psutil.Error:
status = "error"
indented = indent*procname.depth
if box:
prefix = "{}{}".format(indented, str(pid)).ljust(12)
res = "[{}: {} ({})]".format(prefix, procname.trimmed, status)
res = res.ljust(67)
return term.blue+term.bold+res+term.bold+term.white
else:
res = "{}: {} ({})".format(str(pid), procname.cmdline_fmt, status)
wrapped = textwrap.wrap(res, width=term.width-len(indented)*2)
final = [indented + wrapped[0]]
for line in wrapped[1:]:
final.append(" "*(len(indented)+4)+"... "+line)
return term.blue+term.bold+"\n".join(final)+term.bold+term.white
def print_tree(rootpid, tree, info, depth=0):
proc = info.get(rootpid, None)
procname = procname_cache.get(rootpid, None)
if procname:
procname.depth = depth
print_(term.normal+format_process(proc, rootpid, indent=" > ", box=False)+term.normal)
if rootpid not in tree:
return
for child in tree[rootpid]:
print_tree(child, tree, info, depth+1)
def reap_children():
print_(term.bold+term.white+"WATCH: killing process subtree..."+term.normal)
term_printed = set()
def on_terminate(proc):
if proc in term_printed:
return
term_printed.add(proc)
print_(term.bold+term.white+"WATCH: {}: terminated with exit code {}".format(format_process(proc), proc.returncode)+term.normal)
me = psutil.Process()
procs = me.children(recursive=True)
procname_cache.clear()
try:
tree = collections.defaultdict(list)
info = collections.defaultdict(lambda: None)
info[me.pid] = me
procname_cache[me.pid] = ProcName(me)
nonroots = set()
for p in procs:
try:
info[p.pid] = p
nonroots.add(p.pid)
tree[p.ppid()].append(p.pid)
procname_cache[p.pid] = ProcName(p)
except (psutil.NoSuchProcess, psutil.ZombieProcess):
pass
for root in (tree.keys() - nonroots):
print_tree(root, tree, info)
except:
import traceback
traceback.print_exc()
print_(term.bold+term.white+'^^^^ error printing tree ^^^^'+term.normal)
permission_denied = set()
for p in procs:
try:
p.send_signal(signal.SIGINT)
print_(term.bold+term.white+"WATCH: {}: sent sigint".format(format_process(p))+term.normal)
except (OSError, psutil.NoSuchProcess):
print_(term.bold+term.white+"WATCH: {}: not running by the time we tried to interrupt it".format(format_process(p))+term.normal)
except psutil.AccessDenied:
permission_denied.add(p)
print_(term.bold+term.white+"WATCH: {}: can't kill, got a permission error. won't wait long for it.".format(format_process(p))+term.normal)
print_(term.bold+term.white+"WATCH: Waiting for processes to die..."+term.normal)
gone, alive = psutil.wait_procs(procs, timeout=1, callback=on_terminate)
for p in permission_denied:
print_(term.bold+term.white+"WATCH: {}: didn't die, not waiting any longer for it".format(format_process(p))+term.normal)
procs.remove(p)
gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=on_terminate)
if not alive:
print_(term.bold+term.white+"WATCH: done killing subtree"+term.normal)
return
# send SIGTERM
for p in alive:
try:
p.terminate()
print_(term.bold+term.white+"WATCH: {}: sent sigterm".format(format_process(p))+term.normal)
except (OSError, psutil.NoSuchProcess):
print_(term.bold+term.white+"WATCH: {}: not running by the time we tried to terminate it".format(format_process(p))+term.normal)
print_(term.bold+term.white+"WATCH: Waiting for processes to die..."+term.normal)
gone, alive = psutil.wait_procs(alive, timeout=1, callback=on_terminate)
if not alive:
print_(term.bold+term.white+"WATCH: done killing subtree"+term.normal)
return
# send SIGKILL
for p in alive:
print_(term.bold+term.white+"WATCH: {}: survived SIGTERM; trying SIGKILL".format(format_process(p))+term.normal)
try:
p.kill()
except (OSError, psutil.NoSuchProcess):
print_(term.bold+term.white+"WATCH: {}: not running by the time we tried to kill it".format(format_process(p))+term.normal)
print_(term.bold+term.white+"WATCH: Waiting for processes to die..."+term.normal)
gone, alive = psutil.wait_procs(alive, timeout=4, callback=on_terminate)
if not alive:
print_(term.bold+term.white+"WATCH: done killing subtree"+term.normal)
return
# give up
for p in alive:
print_(term.bold+term.white+"WATCH: {}: survived SIGKILL; giving up".format(format_process(p))+term.normal)
class AutoRestartTrick(Trick):
def __init__(self, command, patterns, ignore_patterns, ignore_directories):
super(AutoRestartTrick, self).__init__(
patterns, ignore_patterns, ignore_directories)
self.command = command
self.process = None
self.ratelimit = 0.0
self.last_event = time.time()
self.wait_for_reemit = False
self.unseen_events = 0
def start(self):
print_(term.bold+term.white+"WATCH: STARTING PROCESS..."+term.normal)
if self.process is not None:
print_("\033[31mWATCH: WARNING: TRIED TO START PROCESS TWICE\033[m")
self.process = subprocess.Popen(self.command, preexec_fn=os.setsid)
print_(term.bold+term.green+"WATCH: PROCESS SHOULD NOW BE RUNNING"+term.normal)
def stop(self):
print_(term.bold+term.yellow+"WATCH: STOPPING PROCESS..."+term.normal)
if self.process is None:
print_(term.bold+term.white+"WATCH: HAVE NOT LAUNCHED PROCESS"+term.normal)
return
reap_children()
self.process = None
print_(term.bold+term.white+"WATCH: MARKED AS NO PROCESS"+term.normal)
def on_any_event(self, event):
min_wait = 0.2
thresh_wait = 0.02
self.stop()
now = event.when
delta = max(now - self.last_event, 1e-3)
self.last_event = now
self.ratelimit = min(2, max(0, (0.96 ** delta)*self.ratelimit + 3*(1.0/delta + 0.125)))
time_to_wait = max(min_wait, self.ratelimit - delta)
if time_to_wait > thresh_wait:
print_(term.bold+term.orange+"WATCH: got event after ({:0.3f}s out of {:0.3f}s), ignore duration {:0.3f}s".format(delta, self.ratelimit, time_to_wait)+term.normal)
time.sleep(time_to_wait)
print_(term.bold+term.orange+"WATCH: Skipping {} events".format(self.observer.event_queue.qsize())+term.normal)
self.observer.event_queue.queue.clear()
self.start()
@apw.command(
argument('paths', nargs="+"),
argument('-q', '--quiet', action="store_true"),
argument('--command', nargs=argparse.REMAINDER, required=True),
)
def autorestart(args):
global quiet
quiet = args.quiet
import signal
patterns, ignores, dirnames = parse_patterns(args.paths)
handler = AutoRestartTrick(command=args.command,
patterns=patterns,
ignore_patterns=ignores,
ignore_directories=True)
handler.start()
observer = Observer(timeout=1)
run(handler, dirnames)
handler.stop()
@apw.command(
argument('paths', nargs="+"),
argument('--command', nargs=argparse.REMAINDER, required=True),
)
def watch(args):
patterns, ignores, dirnames = parse_patterns(args.paths)
handler = CommandTrick(args.command, patterns=patterns,
ignore_patterns=ignores, #ignore_patterns,
ignore_directories=True)
run(handler, dirnames)
def parse_patterns(paths):
patterns = []
dirnames = {}
ignores = []
for pathpattern in paths:
if pathpattern.startswith("-"):
pathpattern = pathpattern[1:]
assert "/" not in pathpattern
ignores.append(pathpattern)
elif "**" in pathpattern:
before, stars, after = pathpattern.partition("**")
dirname = os.path.abspath(os.path.expanduser(os.path.dirname(before)))
base = os.path.basename(before)
dirnames[dirname] = True
patterns.append(base+stars+after)
else:
dirname = os.path.dirname(pathpattern)
basename = os.path.basename(pathpattern)
assert "*" not in dirname
dirnames.setdefault(dirname, False)
patterns.append(basename)
return patterns, ignores, dirnames
def run(handler, dirnames):
observer = Observer(timeout=1.0)
handler.observer = observer
watches = []
handler.watches = watches
for dirname, recursive in dirnames.items():
watches.append(observer.schedule(handler, os.path.abspath(os.path.expanduser(dirname)), recursive))
try:
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print_("Stopping")
observer.stop()
print_("Done stopping")
observer.join()
except KeyboardInterrupt:
print_("Shutting down forcefully?")
pass
print_("Stuff may or may not actually happen")
def main():
args = apw.parse_args()
args.func(args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment