Skip to content

Instantly share code, notes, and snippets.

@jdahm
Created February 15, 2019 07:04
Show Gist options
  • Save jdahm/fb17efb9d3593de73a17ae4340514e19 to your computer and use it in GitHub Desktop.
Save jdahm/fb17efb9d3593de73a17ae4340514e19 to your computer and use it in GitHub Desktop.
Process search
#!/usr/bin/env python
#
# psearch
#
# Script used to search for and send signals to processes
#
# AUTHORS
# - Johann Dahm <johann.dahm@gmail.com> (20140209)
#
# LICENSE LGPL-3.0
import sys
import argparse
import subprocess as sp
import datetime
import re
import os, signal
def _entry_dict(l, field):
def _parse_etime(tstr):
"""Parse the elapsed time string and return in seconds"""
if '-' in tstr:
# has a day field
m = re.match(r"(?P<days>\d+)[-](?P<hrs>\d+)[:](?P<min>\d+)[:](?P<sec>\d+)", tstr)
if m is None: raise ValueError("Parse error in string: {0}".format(tstr))
days, hours, minutes, seconds = (int(x) for x in (m.group("days"), m.group("hrs"), m.group("min"), m.group("sec")))
t = datetime.timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
elif len(tstr.split(":")) == 3:
# HH:MM:SS
m = re.match(r"(?P<hrs>\d+)[:](?P<min>\d+)[:](?P<sec>\d+)", tstr)
if m is None: raise ValueError("Parse error in string: {0}".format(tstr))
hours, minutes, seconds = (int(x) for x in (m.group("hrs"), m.group("min"), m.group("sec")))
t = datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds)
elif len(tstr.split(":")) == 2:
# MM:SS
m = re.match(r"(?P<min>\d+)[:](?P<sec>\d+)", tstr)
if m is None: raise ValueError("Parse error in string: {0}".format(tstr))
minutes, seconds = (int(x) for x in (m.group("min"), m.group("sec")))
t = datetime.timedelta(minutes=minutes, seconds=seconds)
else:
raise ValueError("Parse error in string: {0}".format(tstr))
return t.total_seconds()
entry = filter(None, l.strip().split(' '))
d = {}
if 'etime' in field:
d['etime'] = _parse_etime(entry[field.index('etime')])
if 'pid' in field:
d['pid'] = int(entry[field.index('pid')])
if 'args' in field:
d['args'] = ' '.join(entry[field.index('args'):])
return d
def _time_from_etime(dt):
time = datetime.datetime.now() - datetime.timedelta(seconds=dt)
return time.strftime("%H:%M:%S")
class process_list:
def __init__(self, all=False, field=['etime', 'pid']):
"""Creates a process list, calls 'ps'"""
self.field = field
opts = '-ao'
if args.all: opts = '-Ao'
# Get output of ps as a list of strings
ps_output = sp.check_output(['ps', opts, ','.join(self.field)]).split('\n')[1:-1]
# Convert to list of dicts
self._ps = [_entry_dict(l, field) for l in ps_output]
def __iter__(self):
for p in self._ps: yield p
def sort(self, field='etime', reverse=False):
"""Sorts the processes by a given field"""
self._ps.sort(key=lambda k: k[field], reverse=reverse)
def filter(self, pattern, negate=False):
def make_matches(pattern, negate=False):
def matches(d):
m = any([re.search(pattern, str(item)) is not None for item in d.items()])
if negate: m = not m
return m
return matches
matches = make_matches(pattern, negate=negate)
self._ps = filter(matches, self._ps)
def truncate(self, n, negate=False):
if negate:
self._ps = self._ps[n:]
else:
self._ps = self._ps[:n]
def display(self, delim='|', p=True, max_size=30):
maxsize = {f : 0 for f in self.field}
for p in self:
for f in self.field:
maxsize[f] = max(len(str(p[f])), maxsize[f])
for f in self.field:
maxsize[f] = min(max_size, maxsize[f])
sl = []
for p in self:
e = []
for f in self.field:
if f == 'etime': rep = _time_from_etime(p[f])
else: rep = p[f]
e.append("{0:{1}}".format(rep, maxsize[f]))
sl.append(delim.join(e))
s = '\n'.join(sl)
if p: print s
return s
def __str__(self):
return self.display(p=False)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Search for and send signals to running processes')
parser.add_argument('pattern', type=str, help='Regular expression search')
parser.add_argument('--nselect', metavar='n', type=int, default=0, help='Number of matches to select. If this flag is not passed, all processes matching pattern are chosen, and the negate flag has no effect.')
parser.add_argument('--negate', action='store_true', help='Negates the match ordering')
parser.add_argument('--newest', action='store_true', help='Sorts matches by most recently started')
parser.add_argument('--all', '-a', action='store_true', help='Search all running processes, not just processes attached to parent')
parser.add_argument('--delimiter', '-d', nargs='?', const=' | ', default=' | ')
parser.add_argument('--signal', '-s', type=int, default=0, help='Send signal to matched processes')
parser.add_argument('--hup', action='store_true', help='Send SIGHUP signal to matched processes')
parser.add_argument('--kill', '-k', action='store_true', help='Send SIGKILL signal to matched processes')
parser.add_argument('--verbose', '-v', action='count', help='Verbose output')
args = parser.parse_args()
sig = None
if args.signal > 0: sig = args.signal
if args.hup: sig = signal.SIGHUP
if args.kill: sig = signal.SIGKILL
# TODO Could pass fields as option
ps = process_list(all=args.all, field=['etime', 'pid', 'args'])
# Filter
ps.filter(args.pattern)
ps.filter('psrch', negate=True)
# Sort
ps.sort(field='etime', reverse=(not args.newest))
# Truncate
if args.nselect > 0: ps.truncate(args.nselect)
# List processes
ps.display(delim=args.delimiter)
# Send signal to processes
if sig is not None:
for p in ps:
if args.verbose > 0:
print "Sending signal {0} to process '{1}'".format(sig, process_name(t))
os.kill(p['pid'], sig)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment