Skip to content

Instantly share code, notes, and snippets.

@pietromarchesi
Created February 1, 2018 16:12
Show Gist options
  • Save pietromarchesi/d390a6a034289e182dc014aabbed9e87 to your computer and use it in GitHub Desktop.
Save pietromarchesi/d390a6a034289e182dc014aabbed9e87 to your computer and use it in GitHub Desktop.
SciLuigi workflow for SLURM
import os
import luigi
import sciluigi
class MyWorkflow(sciluigi.WorkflowTask):
runmode = luigi.Parameter()
def workflow(self):
if self.runmode == 'local':
runmode = sciluigi.RUNMODE_LOCAL
elif self.runmode == 'hpc':
runmode = sciluigi.RUNMODE_HPC
elif self.runmode == 'mpi':
runmode = sciluigi.RUNMODE_MPI
else:
raise Exception('Runmode is none of local, hpc, nor mpi. Please fix and try again!')
foowriter = self.new_task('foowriter', MyFooWriter,
slurminfo=sciluigi.SlurmInfo(
runmode=runmode,
project='slurm_project',
partition='knl',
cores='1',
time='1:00:00',
jobname='foowriter',
threads='1'
))
fooreplacer = self.new_task('fooreplacer', MyFooReplacer,
slurminfo=sciluigi.SlurmInfo(
runmode=runmode,
project='slurm_project',
partition='knl',
cores='1',
time='1:00:00',
jobname='fooreplacer',
threads='1'
))
# Here we do the *magic*: Connecting outputs to inputs:
fooreplacer.in_foo = foowriter.out_foo
# Return the last task(s) in the workflow chain.
return fooreplacer
class MyFooWriter(sciluigi.SlurmTask):
# We have no inputs here
# Define outputs:
def out_foo(self):
return sciluigi.TargetInfo(self, 'foo.txt')
def run(self):
with self.out_foo().open('w') as foofile:
foofile.write('foo\n')
class MyFooReplacer(sciluigi.SlurmTask):
# Here we have one input, a "foo file":
in_foo = None
# ... and an output, a "bar file":
def out_replaced(self):
# As the path to the returned target(info), we
# use the path of the foo file:
return sciluigi.TargetInfo(self, os.path.dirname(self.in_foo().path)+'bar.txt')
def run(self):
# Here, we use the in-built self.ex() method, to execute commands:
# NOTE: variables inside '' do NOT get substituted, you need to use ""
self.ex('sed "s/foo/$(hostname)/" {inf} > {out}'.format(
inf = self.in_foo().path,
out = self.out_replaced().path))
if __name__ == '__main__':
sciluigi.run_local(main_task_cls=MyWorkflow)
'''
run as:
python sciluigi_slurm_example.py --runmode hpc
or
python sciluigi_slurm_example.py --runmode local
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment