Skip to content

Instantly share code, notes, and snippets.

@pietromarchesi
Last active April 17, 2023 09:24
Show Gist options
  • Save pietromarchesi/07219e09b12c19cb8f0f05238bc8be06 to your computer and use it in GitHub Desktop.
Save pietromarchesi/07219e09b12c19cb8f0f05238bc8be06 to your computer and use it in GitHub Desktop.
Running SciLuigi on a Slurm cluster (draft)

In this example we show how to adapt the example workflow so that we can run it on a cluster using the Slurm Workload Manager (Slurm).

We will write a workflow composed of two tasks, one which creates a file called foo.txt and writes foo in it, and one which reads foo.txt and swaps every occurrence of foo with the name of the cluster node on which the workflow is running.

To do this, we have to slightly change the workflow definition. In particular, we set up a runmode parameter which will allow us to specify from the command line whether we want the workflow to be run locally or on the cluster.

When we define the tasks, we have to pass an additional SlurmInfo object, which contains the specification of the resources we want to allocate to our tasks and other Slurm parameters.

class MyWorkflow(sciluigi.WorkflowTask):

    # We define a runmode parameter to specify how to run the   workflow
    runmode = luigi.Parameter()

    def workflow(self):

        if self.runmode == 'local':
            runmode = sciluigi.RUNMODE_LOCAL
        elif self.runmode == 'hpc':
            runmode = sciluigi.RUNMODE_HPC
        elif self.runmode == 'mpi':
            runmode = sciluigi.RUNMODE_MPI
        else:
            raise Exception('Runmode is none of local, hpc, nor mpi. Please fix and try again!')

        # we construct our tasks as before, but we pass an additional
        # SlurmInfo object
        foowriter = self.new_task('foowriter', MyFooWriter,
                                  slurminfo=sciluigi.SlurmInfo(
                                  runmode=runmode,
                                  project='myname', # this should be the account (salloc -A, strange name choice)
                                  partition='mypartition',
                                  cores='1',
                                  time='1:00:00',
                                  jobname='foowriter',
                                  threads='1'))

        fooreplacer = self.new_task('fooreplacer', MyFooReplacer,
                                    slurminfo=sciluigi.SlurmInfo(
                                    runmode=runmode,
                                    project='myname',
                                    partition='mypartition',
                                    cores='1',
                                    time='1:00:00',
                                    jobname='fooreplacer',
                                    threads='1'))

        # Here we do the *magic*: Connecting outputs to inputs:
        fooreplacer.in_foo = foowriter.out_foo

        # Return the last task(s) in the workflow chain.
        return fooreplacer

The parameters we pass to SlurmInfo will be used to construct a call to the Slurm command salloc, which will request the resources on the cluster. On these lines, you can see how the SciLuigi parameters translate to in salloc parameters, and you can find a list of the salloc parameters here or by just typing salloc --help on the cluster.

It may be useful to point out that actually what in SciLuigi is called project_name, corresponds to the -A or --account in Slurm, which is the account to which resources will be charged.

Now we can move on to setting up our tasks. MyFooWriter will be unchanged, except for the fact that it subclass sciluigi.SlurmTask instead of sciluigi.Task:

class MyFooWriter(sciluigi.SlurmTask):
    def out_foo(self):
        return sciluigi.TargetInfo(self, 'foo.txt')
    def run(self):
        with self.out_foo().open('w') as foofile:
            foofile.write('foo\n')

Then, we will create a bash script called replace_with_hostname.sh, in the same directory as our workflow, which is going to replace the foo string with the hostname of the machine on which the job is running. To do that, it should contain the following lines

#! /bin/bash
sed "s/foo/$(hostname)/" $1 > $2

This script will use the sed command to replace occurrences of foo in a file specified by the first command line argument with the host name, and write the modified file to the second command line argument.

We can now write the MyFooReplacer task, which will call the bash script we just created.

class MyFooReplacer(sciluigi.SlurmTask):
    # Here we have one input, a "foo file":
    in_foo = None
    # ... and an output, a "bar file":
    def out_replaced(self):
        # As the path to the returned target(info), we
        # use the directory of the foo file, and save the modified
        # version to a file called bar.txt
        out_file = os.path.join(os.path.dirname(self.in_foo().path), 'bar.txt')
        return sciluigi.TargetInfo(self, out_file)

    def run(self):
        # Here, we use the in-built self.ex() method, to execute our script, and we pass two command line arguments to it
        self.ex('./replace_with_hostname.sh {in_file} {out_file}'.format(
                in_file = self.in_foo().path,
                out_file = self.out_replaced().path))

At the end of the script, we add

if __name__ == '__main__':
    sciluigi.run_local(main_task_cls=MyWorkflow)

And now we can run locally with

python sciluigi_slurm_example.py --runmode local

Or on the cluster with

python sciluigi_slurm_example.py --runmode hpc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment