Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
pyrocfile - 100 line Procfile manager in Python
# Pyrocfile - Simple Python impementation of Procfile manager
# Written by Chris Testa ( in 2011
# Released in the Public Domain
import argparse, logging, os.path, random, re, select, signal, subprocess
def _new_logger(name, color=None):
logger = logging.getLogger(name)
hdlr = logging.StreamHandler()
color, end_color = '\033[9%dm' % (color or random.randint(1, 6)), '\033[0m'
formatter = logging.Formatter(color + '%(asctime)s %(name)20s |' + end_color + ' %(message)s')
return logger
class ProcessManager():
def __init__(self, procfile, concurrencies, env, cwd):
self.procfile = procfile
self.concurrencies = concurrencies
self.env = env
self.cwd = cwd
self.processes = []
self.loggers = {}
self.running = False
self.loggers['system'] = _new_logger('system', color=9)
def start_all(self):
for id, command in self.procfile.iteritems():
for i in xrange(int(self.concurrencies.get(id, 1))):
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, env=self.env, cwd=self.cwd)
self.loggers[] = logger = _new_logger('%s.%d' % (id, i+1))'started with pid %d',
def watch(self):
fp_to_p = {}
rlist = []
for p in self.processes:
fp_to_p[p.stdout] = p
fp_to_p[p.stderr] = p
rlist.extend([p.stdout, p.stderr])
self.running = True
while self.running:
rready, _, _ =, [], [], 1)
for r in rready:
p = fp_to_p[r]
logger = self.loggers[]
except select.error:
logger = self.loggers['system']'sending SIGTERM to all processes')
for p in self.processes:
def interrupt(self):
self.running = False
def main():
parser = argparse.ArgumentParser(description='Work with Procfiles.')
parser.add_argument('--concurrency', '-c',
help='Specify the number of each process type to run. The value passed in should be in the format process=num,process=num')
parser.add_argument('--env', '-e',
help='Specify an alternate environment file. You can specify more than one file by using: --env file1,file2.')
parser.add_argument('--procfile', '-f', default='Procfile',
help='Specify an alternate location for the application\'s Procfile. This file\'s containing directory will be assumed to be the root directory of the application.')
args = parser.parse_args()
procfile = {}
with open(args.procfile) as f:
for line in f.readlines():
match ='([a-zA-Z0-9_-]+):(.*)', line)
if not match:
raise Exception('Bad Procfile line')
procfile[] =
cwd = os.path.dirname(os.path.realpath(args.procfile))
concurrencies = dict([kv.split('=') for kv in args.concurrency.split(',')]) if args.concurrency else {}
env = None
if args.env:
env = {}
for envfname in args.env.split(',')
with open(envfname) as f:
env.update(dict([ l.split('=') for l in f ]))
process_manager = ProcessManager(procfile, concurrencies, env, cwd)
def _interrupt(signum, frame):
print "SIGINT received"
signal.signal(signal.SIGINT, _interrupt)
if __name__ == '__main__':
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment