Skip to content

Instantly share code, notes, and snippets.

@sixy6e
Last active August 4, 2016 06:14
Show Gist options
  • Save sixy6e/47e77aad0b3c3d5fefb2 to your computer and use it in GitHub Desktop.
Save sixy6e/47e77aad0b3c3d5fefb2 to your computer and use it in GitHub Desktop.
luigi mpi mock
#!/usr/bin/env python
import numpy
import luigi
import luigi.contrib.mpi as mpi
import cPickle as pickle
from os.path import exists
import time
from datetime import datetime
class QueryDB(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget('query_results.pkl')
def run(self):
print 'Querying DB'
value = numpy.random.randint(1, 33, (1))[0]
value = {'value': value}
with self.output().open('w') as outf:
pickle.dump(value, outf)
class ComputeComplexFunc(luigi.Task):
out_fname = luigi.Parameter()
out_value = luigi.Parameter()
def requires(self):
return []
def output(self):
return luigi.LocalTarget(self.out_fname)
def run(self):
time.sleep(numpy.random.randint(1, 33, (1))[0])
with open(self.out_fname, 'w') as outf:
outf.write(bytes(datetime.now()))
outf.write(bytes(self.out_value))
print 'Value {} written.'.format(self.out_value)
class DefineTasks(luigi.Task):
def requires(self):
tasks = []
with open('query_results.pkl', 'r') as f:
values = pickle.load(f)
for i in range(values['value']):
out_fname = 'file_{}.txt'.format(i)
tasks.append(ComputeComplexFunc(out_fname, i))
return tasks
def output(self):
return luigi.LocalTarget('DefineTasks.completed')
def run(self):
print 'DefineTasks has completed!'
with self.output().open('w') as outf:
outf.write('Completed')
if __name__ == '__main__':
tasks = [QueryDB()]
mpi.run(tasks)
tasks = [DefineTasks()]
mpi.run(tasks)
#!/bin/bash
#PBS -P v10
#PBS -q express
#PBS -l walltime=00:10:00,mem=1GB,ncpus=32
#PBS -l wd
#PBS -me
#PBS -M joshua.sixsmith@ga.gov.au
module load python/2.7.6
module use /projects/u46/opt/modules/modulefiles
module load luigi-mpi
mpirun -n 32 python mock_workflow.py
@smr547
Copy link

smr547 commented Apr 10, 2015

I've played with this and it looks really cool. I forked the gist to do some experiments. See: https://gist.github.com/smr547/52212f84421d4f509e62

I've added a sleep(10) to the QueryDB.run() method to simulate the delay experienced in a real DB query and also added some detailed logging to per-process log files.

The resulting logs showed that all processes (except the one executing the query) will complete the first mpi.run() call and enter the second mpi.run() call before the query is complete and before it's results are available in the pickle file.

At first look, this might seem bad!! However, all processes (slaves) synchronise with the Master and pause until the query is complete. This is fortunate, but not what I was expecting.

The following log extract is from slave 29, around the time while the query is being executed by slave 16. Note the 10 second delay while Slave 29 waiting to sync with Master

...
2015-04-10 16:39:34,622: [luigi-interface] (INFO) There are no more tasks to run at this time {rank=29}
2015-04-10 16:39:34,622: [luigi-interface] (INFO) QueryDB() is currently run by worker Worker(salt=715810340, workers=1, host=r80, username=smr547, pid=12316, rank=16) {rank=29}
2015-04-10 16:39:34,622: [root] (DEBUG) finished executing mpi.run() stage 1, now defining stage 2 tasks {rank=29}
2015-04-10 16:39:34,623: [root] (DEBUG) executing mpi.run() in stage 2 {rank=29}
2015-04-10 16:39:34,623: [luigi-interface] (DEBUG) Slave 29 waiting to sync with Master {rank=29}
2015-04-10 16:39:44,644: [luigi-interface] (DEBUG) Slave 29 locally updated task status. {rank=29}
2015-04-10 16:39:44,645: [luigi-interface] (DEBUG) Checking if DefineTasks() is complete {rank=29}
2015-04-10 16:39:44,646: [root] (DEBUG) DefineTasks.requires() called {rank=29}
...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment