Skip to content

Instantly share code, notes, and snippets.

@smr547
Forked from sixy6e/mock_workflow.py
Last active August 29, 2015 14:18
Show Gist options
  • Save smr547/52212f84421d4f509e62 to your computer and use it in GitHub Desktop.
Save smr547/52212f84421d4f509e62 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import numpy
import luigi
import luigi.contrib.mpi as mpi
import cPickle as pickle
from os.path import exists
import time
from datetime import datetime
from mpi_log_utils import MPILogFilter
import os
import logging
class QueryDB(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget('query_results.pkl')
def run(self):
logging.info('Querying DB')
print 'Querying DB'
time.sleep(10) # simulate a modest DB query/resultset cycle
value = numpy.random.randint(1, 33, (1))[0]
value = {'value': value}
with self.output().open('w') as outf:
pickle.dump(value, outf)
logging.info('Done querying DB')
class ComputeComplexFunc(luigi.Task):
out_fname = luigi.Parameter()
out_value = luigi.Parameter()
def requires(self):
return []
def output(self):
return luigi.LocalTarget(self.out_fname)
def run(self):
logging.debug("ComputeComplexFunc.run() called")
time.sleep(numpy.random.randint(1, 33, (1))[0])
with open(self.out_fname, 'w') as outf:
outf.write(bytes(datetime.now()))
outf.write(bytes(self.out_value))
msg = 'Value {} written.'.format(self.out_value)
logging.info(msg)
print msg
class DefineTasks(luigi.Task):
def requires(self):
logging.debug("DefineTasks.requires() called")
tasks = []
with open('query_results.pkl', 'r') as f:
values = pickle.load(f)
for i in range(values['value']):
out_fname = 'file_{}.txt'.format(i)
tasks.append(ComputeComplexFunc(out_fname, i))
return tasks
def output(self):
return luigi.LocalTarget('DefineTasks.completed')
def run(self):
logging.info('DefineTasks has completed!')
with self.output().open('w') as outf:
outf.write('Completed')
if __name__ == '__main__':
# setup logging
log_dir = "."
logfile = "%s/mpi_process_%s_%d_rank_%d.log" % \
(log_dir, os.uname()[1], os.getpid(), MPILogFilter.get_rank())
logging.basicConfig( \
filename=logfile, \
format='%(asctime)s: [%(name)s] (%(levelname)s) %(message)s {rank=%(mpi_rank)d}', \
level=logging.DEBUG)
logging.root.handlers[0].addFilter(MPILogFilter()) # supplies MPI properties
# do some work -- stage 1
logging.debug("creating intial task list")
tasks = [QueryDB()]
logging.debug("executing mpi.run() in stage 1")
mpi.run(tasks)
logging.debug("finished executing mpi.run() stage 1, now defining stage 2 tasks")
# do some work -- stage 2
tasks = [DefineTasks()]
logging.debug("executing mpi.run() in stage 2")
mpi.run(tasks)
logging.debug("finished executing mpi.run() stage 2, All done!!")
#!/bin/env python
"""
MPI loging
"""
import os
import logging
from mpi4py import MPI
class MPILogFilter(logging.Filter):
@staticmethod
def get_rank():
return MPI.COMM_WORLD.Get_rank()
"""
A logging Filter that simply adds MPI status attributes to
a LogRecord during log event processing
"""
def filter(self, record):
"""
Add MPI status attributes to the supplied LogRecord
"""
record.mpi_rank = MPILogFilter.get_rank()
return True # we allow the record to be processed
#!/bin/bash
#PBS -P v10
#PBS -q express
#PBS -l walltime=00:10:00,mem=1GB,ncpus=32
#PBS -l wd
#PBS -me
#PBS -M joshua.sixsmith@ga.gov.au
module load python/2.7.6
module use /projects/u46/opt/modules/modulefiles
module load luigi-mpi
mpirun -n 32 python mock_workflow.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment