Skip to content

Instantly share code, notes, and snippets.

@jfburkhart
Created May 7, 2012 12:49
Show Gist options
  • Save jfburkhart/2627589 to your computer and use it in GitHub Desktop.
Save jfburkhart/2627589 to your computer and use it in GitHub Desktop.
a multiprocess script
#!/usr/bin/env python
"""
jfb, 2010.10.20
"""
from copy import copy
import shutil
import datetime as dt
import os, time
import os.path as osp
import glob
from subprocess import PIPE, call, Popen
import multiprocessing
import logging
def now():
return dt.datetime.now()
print """
WARNING:
This module is under heavy development, and is not well tested.
It creates, modifies, and deletes file system files. USE AT YOUR
OWN RISK. ~jfb
"""
class Runner:
""" The 'Runner' class is a queue and option parser for programs
that can be run from the command line.
It is a parent class for subclasses which are specific to the
programs that will be called. See the doc strings of the child
classes for more details.
NOTE::
Alone this class will not work, child classes are required
and should have gen_input and run methods at a minimum.
"""
num_runners = 0
class OptionError:
def __init__(self, option=''):
print("Not Valid input option: {0}".format(option))
pass
def __init__(self, **kwargs):
Runner.num_runners += 1
self.default_name = str(self.__class__).split('.')[-1][:3] + \
str(Runner.num_runners)
self.valid_options = []
self.default_options = {}
if 'options' not in kwargs:
self.options = self.default_options
else:
self._op_check(kwargs['options'])
self.options = kwargs['options']
if 'user_dir' not in kwargs:
self.user_dir = '.'
self.queue = {}
self.qcnt = 0
def _op_check(self, options):
""" Checks that options are valid. """
for k in options.keys():
if k not in self.valid_options:
raise self.OptionError(k)
pass
def set_option(self, option, value='', run_id=None):
""" set options manually, option is a string identifier
value can be several types, but needs to be able to be
converted to a string.
For lists, each element will be converted to a string.
run_id is a string identifier that is a 'key' to the run
queue, if provided you can modify runs already set up in
queue.
"""
assert isinstance(option, str), "option must be a string"
if option not in self.valid_options:
raise self.OptionError(option)
if isinstance(value, dict):
raise ValueError('value type must be one of: \
str, int, float.')
if hasattr(value, '__iter__'):
value = ' '.join([str(v) for v in value])
else:
value = str(value) # TODO: this is not necessarily required
assert isinstance(value, str), "value not converted to string."
options = self._get_from_queue(run_id)
options[option] = value
def _get_options_from_queue(self, run_id):
""" internal method to return dictionary of options from queue
if run_id is None, returns default
"""
if run_id:
if run_id not in self.queue:
raise KeyError('run_id: {0} not in queue'.format(run_id))
else:
return self.queue[run_id]
else:
return self.options
def overide_options(self, options, run_id=None):
""" completely overide the options dictionary.
run_id is a string identifier that is a 'key' to the run
queue, if provided you can modify runs already set up in
queue.
"""
assert isinstance(options, dict), "override options requires a dict"
if run_id in self.queue:
self.queue[run_id] = options
else:
self._op_check(options)
self.options = options
def update_options(self, options, run_id=None):
""" update the options dictionary using a dict.
run_id is a string identifier that is a 'key' to the run
queue, if provided you can modify runs already set up in
queue.
"""
assert isinstance(options, dict), "update options requires a dict"
self._op_check(options)
old_options = self._get_from_queue(run_id)
old_options.update(options)
def add_run(self, run_id=None, options=None):
""" adds a run to the queue
settings can be passed as an optional dictionary::
add_run('run_id', options=MySettings)
"""
if options is None:
options = copy(self.options)
if run_id is None:
run_id = self.default_name + '_' + str(self.qcnt)
self.queue[run_id] = {'options' : options}
self.qcnt += 1
def clear_queue(self):
""" clears the queue dictionary """
proceed = raw_input('WARNING: Delete all _run information in the queue?[y]')
if proceed.lower() in ['', 'y', 'yes']:
self.queue = {}
self.qcnt = 0
def print_queue(self):
for run_id in self.queue:
options = self.queue[run_id]
print('### File: {0} ###'.format(run_id))
for option in options:
print('{0} : {1}'.format(option, options[option]))
print('')
print('Current defaults: {0}'.format(self.options))
def _call(self, cmd, run_id, *args, **kwargs):
""" Uses subprocess call convenience function """
""" OVERRIDING _call """
#cmd = 'echo "{0}"'.format(cmd)
ofile = osp.join(self.BASE_DIR, 'run_{0}.nh'.format(run_id))
pid = Popen(cmd,
stdout=open( ofile, 'w'),
stderr=open('logfile.log', 'a')).pid
print 'Run ID: {0}, pid: {1}'.format(run_id, pid)
return
def _worker(self, run_id, verbose=0):
"""thread _worker function"""
self.run(run_id, verbose)
return
def run_queue(self, runs=None, verbose=0):
""" queue is a dictionary keyed by run_id. The values
are parsed into options for the program that is called.
The option parsing is specific to each program, and not
done here, but done in the _run method of the child class.
"""
assert self.queue, "Must generate at least one input file"
# can pass a list of runs, if you don't want all.
if not runs:
run_ids = self.queue.keys()
self.alljobs, jobs = [], []
while run_ids:
CPUs = range(multiprocessing.cpu_count())
while CPUs:
try:
run_id = run_ids.pop()
except:
return
P = multiprocessing.Process(target=self._worker, args=(run_id, verbose))
jobs.append(P)
self.alljobs.append(P)
P.start()
time.sleep(0.25)
CPUs.pop()
for job in jobs:
job.join()
jobs = []
def wait_for_queue(self):
""" wait for all jobs in queue before proceeding. """
for job in self.alljobs:
job.join()
return
class Test_nohup(Runner):
"""
Test Runner for nohup
Note::
The input and output files will be named according to the run_id
passed to add_run method: run_id.inp and run_id.out
"""
def __init__(self,
base_dir=None,
run_dir='test',
verbose=False,
):
Runner.__init__(self)
# Are we defining a custom base directory?
if not base_dir:
self.BASE_DIR = os.path.abspath(osp.curdir)
print("WARNING, No BASE_DIR defined. Using Defaults.")
else:
self.BASE_DIR = base_dir
self.run_dir = run_dir
self.verbose = verbose
self.queue = {}
self.qcnt = 0
def run(self, run_id, verbose=False):
"""push the job to the queue """
cmd = ['nohup', 'a.out']
#args = ['/xnilu_wrk/jfb/SnakePit/pflexrun/a.out > {0}.nh &'.format(run_id)]
#cmd = """nohup {0} > {1}.nh""".format(cmd, run_id)
os.chdir(osp.join(self.BASE_DIR, self.run_dir))
self._call(cmd, run_id )
os.chdir(self.BASE_DIR)
def gen_input(self, run_id=None):
""" Creates a FLEXPART Run Class that will set up input/output
Directories
"""
pass
def _call(self, cmd, run_id, *args, **kwargs):
""" Uses subprocess call convenience function """
""" OVERRIDING _call """
#cmd = 'echo "{0}"'.format(cmd)
pid = Popen(cmd,
stdout=open('run_{0}.nh'.format(run_id), 'w'),
stderr=open('logfile.log', 'a')).pid
print 'Run ID: {0}, pid: {1}'.format(run_id, pid)
return
class Test_pool(Runner):
"""
Test Runner for pool
Note::
The input and output files will be named according to the run_id
passed to add_run method: run_id.inp and run_id.out
"""
def __init__(self,
base_dir=None,
run_dir='test',
verbose=False,
):
Runner.__init__(self)
# Are we defining a custom base directory?
if not base_dir:
self.BASE_DIR = os.path.abspath(osp.curdir)
print("WARNING, No BASE_DIR defined. Using Defaults.")
else:
self.BASE_DIR = base_dir
self.run_dir = run_dir
self.verbose = verbose
self.queue = {}
self.qcnt = 0
def run(self, run_id, verbose=False):
"""push the job to the queue """
cmd = ['a.out']
#args = ['/xnilu_wrk/jfb/SnakePit/pflexrun/a.out > {0}.nh &'.format(run_id)]
#cmd = """nohup {0} > {1}.nh""".format(cmd, run_id)
os.chdir(osp.join(self.BASE_DIR, self.run_dir))
self._call(cmd, run_id )
os.chdir(self.BASE_DIR)
def gen_input(self, run_id=None):
""" Creates a FLEXPART Run Class that will set up input/output
Directories
"""
pass
def _call(self, cmd, run_id, *args, **kwargs):
""" Uses subprocess call convenience function """
""" OVERRIDING _call """
#cmd = 'echo "{0}"'.format(cmd)
pid = Popen(cmd,
stdout=open('run_{0}.nh'.format(run_id), 'w'),
stderr=open('logfile.log', 'a')).pid
print 'Run ID: {0}, pid: {1}'.format(run_id, pid)
return
def run_queue(self, runs=None, verbose=0):
""" queue is a dictionary keyed by run_id. The values
are parsed into options for the program that is called.
The option parsing is specific to each program, and not
done here, but done in the _run method of the child class.
"""
assert self.queue, "Must generate at least one input file"
# can pass a list of runs, if you don't want all.
if not runs:
run_ids = self.queue.keys()
self.alljobs, jobs = [], []
while run_ids:
pool = multiprocessing.Pool(processes=4)
args = []
for i in range(4):
try:
id = run_ids.pop()
args.append((id, verbose))
except:
break
result = pool.apply_async(self.run, args=args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment