Created
February 5, 2018 13:43
-
-
Save pietromarchesi/c5b4b4780b5464548a5c1f5fd480bf90 to your computer and use it in GitHub Desktop.
Example Slurm workflow - parallel workflows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import luigi | |
import sciluigi | |
import os | |
class MyMetaWorkflow(sciluigi.WorkflowTask): | |
runmode = luigi.Parameter() | |
n_tasks = luigi.IntParameter() | |
def workflow(self): | |
if self.runmode == 'local': | |
runmode = sciluigi.RUNMODE_LOCAL | |
elif self.runmode == 'hpc': | |
runmode = sciluigi.RUNMODE_HPC | |
elif self.runmode == 'mpi': | |
runmode = sciluigi.RUNMODE_MPI | |
else: | |
raise Exception('Runmode is none of local, hpc, nor mpi. Please fix and try again!') | |
tasks = [] | |
for t in range(self.n_tasks): | |
wf = self.new_task('wf', MyWorkflow, task_n=t, | |
runmode=runmode) | |
tasks.append(wf) | |
return tasks | |
class MyWorkflow(sciluigi.WorkflowTask): | |
runmode = luigi.Parameter() | |
task_n = luigi.IntParameter() | |
def workflow(self): | |
foowriter = self.new_task('foowriter', MyFooWriter, | |
task_n=self.task_n, | |
slurminfo=sciluigi.SlurmInfo( | |
runmode=self.runmode, | |
project='pcp0135', # this should be the account (salloc -A, strange name choice) | |
partition='knl', | |
cores='1', | |
time='1:00:00', | |
jobname='foowriter', | |
threads='1')) | |
fooreplacer = self.new_task('fooreplacer', MyFooReplacer, | |
task_n=self.task_n, | |
slurminfo=sciluigi.SlurmInfo( | |
runmode=self.runmode, | |
project='pcp0135', | |
partition='knl', | |
cores='1', | |
time='1:00:00', | |
jobname='fooreplacer', | |
threads='1')) | |
# Here we do the *magic*: Connecting outputs to inputs: | |
fooreplacer.in_foo = foowriter.out_foo | |
# Return the last task(s) in the workflow chain. | |
return fooreplacer | |
class MyFooWriter(sciluigi.Task): | |
task_n = luigi.IntParameter() | |
# We have no inputs here | |
# Define outputs: | |
def out_foo(self): | |
return sciluigi.TargetInfo(self, 'foo.txt') | |
def run(self): | |
with self.out_foo().open('w') as foofile: | |
foofile.write('foo\n') | |
class MyFooReplacer(sciluigi.SlurmTask): | |
task_n = luigi.IntParameter() | |
# Here we have one input, a "foo file": | |
in_foo = None | |
# ... and an output, a "bar file": | |
def out_replaced(self): | |
# As the path to the returned target(info), we | |
# use the directory of the foo file, and save the modified | |
# version to a file called bar.txt | |
out_file = os.path.join(os.path.dirname(self.in_foo().path), | |
'host{}.txt'.format(str(self.task_n))) | |
return sciluigi.TargetInfo(self, out_file) | |
def run(self): | |
# Here, we use the in-built self.ex() method, to execute commands: | |
self.ex('./replace_with_hostname.sh {in_file} {out_file}'.format( | |
in_file = self.in_foo().path, | |
out_file = self.out_replaced().path)) | |
if __name__ == '__main__': | |
sciluigi.run_local(main_task_cls=MyMetaWorkflow) | |
# run as python parallelslurmflow.py --runmode hpc --n-tasks 5 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment