Skip to content

Instantly share code, notes, and snippets.

@pietromarchesi
Created February 5, 2018 13:43
Show Gist options
  • Save pietromarchesi/c5b4b4780b5464548a5c1f5fd480bf90 to your computer and use it in GitHub Desktop.
Save pietromarchesi/c5b4b4780b5464548a5c1f5fd480bf90 to your computer and use it in GitHub Desktop.
Example Slurm workflow - parallel workflows
import luigi
import sciluigi
import os
class MyMetaWorkflow(sciluigi.WorkflowTask):
runmode = luigi.Parameter()
n_tasks = luigi.IntParameter()
def workflow(self):
if self.runmode == 'local':
runmode = sciluigi.RUNMODE_LOCAL
elif self.runmode == 'hpc':
runmode = sciluigi.RUNMODE_HPC
elif self.runmode == 'mpi':
runmode = sciluigi.RUNMODE_MPI
else:
raise Exception('Runmode is none of local, hpc, nor mpi. Please fix and try again!')
tasks = []
for t in range(self.n_tasks):
wf = self.new_task('wf', MyWorkflow, task_n=t,
runmode=runmode)
tasks.append(wf)
return tasks
class MyWorkflow(sciluigi.WorkflowTask):
runmode = luigi.Parameter()
task_n = luigi.IntParameter()
def workflow(self):
foowriter = self.new_task('foowriter', MyFooWriter,
task_n=self.task_n,
slurminfo=sciluigi.SlurmInfo(
runmode=self.runmode,
project='pcp0135', # this should be the account (salloc -A, strange name choice)
partition='knl',
cores='1',
time='1:00:00',
jobname='foowriter',
threads='1'))
fooreplacer = self.new_task('fooreplacer', MyFooReplacer,
task_n=self.task_n,
slurminfo=sciluigi.SlurmInfo(
runmode=self.runmode,
project='pcp0135',
partition='knl',
cores='1',
time='1:00:00',
jobname='fooreplacer',
threads='1'))
# Here we do the *magic*: Connecting outputs to inputs:
fooreplacer.in_foo = foowriter.out_foo
# Return the last task(s) in the workflow chain.
return fooreplacer
class MyFooWriter(sciluigi.Task):
task_n = luigi.IntParameter()
# We have no inputs here
# Define outputs:
def out_foo(self):
return sciluigi.TargetInfo(self, 'foo.txt')
def run(self):
with self.out_foo().open('w') as foofile:
foofile.write('foo\n')
class MyFooReplacer(sciluigi.SlurmTask):
task_n = luigi.IntParameter()
# Here we have one input, a "foo file":
in_foo = None
# ... and an output, a "bar file":
def out_replaced(self):
# As the path to the returned target(info), we
# use the directory of the foo file, and save the modified
# version to a file called bar.txt
out_file = os.path.join(os.path.dirname(self.in_foo().path),
'host{}.txt'.format(str(self.task_n)))
return sciluigi.TargetInfo(self, out_file)
def run(self):
# Here, we use the in-built self.ex() method, to execute commands:
self.ex('./replace_with_hostname.sh {in_file} {out_file}'.format(
in_file = self.in_foo().path,
out_file = self.out_replaced().path))
if __name__ == '__main__':
sciluigi.run_local(main_task_cls=MyMetaWorkflow)
# run as python parallelslurmflow.py --runmode hpc --n-tasks 5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment