Skip to content

Instantly share code, notes, and snippets.

@fields
Last active March 21, 2019 21:37
Show Gist options
  • Save fields/fe8288eaf344bb121af942582f3f8848 to your computer and use it in GitHub Desktop.
Save fields/fe8288eaf344bb121af942582f3f8848 to your computer and use it in GitHub Desktop.

class RegisterFinish(luigi.Task):
    item_id = luigi.IntParameter()
    run_id = luigi.IntParameter()

    def requires(self):
        return [RunSubJob(self.item_id, self.run_id), NodepJob(self.item_id, self.run_id)]

    def output(self):
        return luigi.LocalTarget('/tmp/luigi-test/register-finish-%s-%s.txt' % (self.item_id, self.run_id))

    def run(self):
        print("register finish")
        with self.output().open('w') as f:
            f.write('OK')


class InitSubJob(luigi.Task):
    item_id = luigi.IntParameter()
    run_id = luigi.IntParameter()

    def output(self):
        return luigi.LocalTarget('/tmp/luigi-test/init-sub-job-%s-%s.txt' % (self.item_id, self.run_id))

    def run(self):
        print("init subjob")
        with self.output().open('w') as f:
            f.write('OK')

class RunSubJob(luigi.Task):
    item_id = luigi.IntParameter()
    run_id = luigi.IntParameter()

    def output(self):
        return luigi.LocalTarget('/tmp/luigi-test/run-sub-job-%s-%s.txt' % (self.item_id, self.run_id))

    def requires(self):
        return [InitSubJob(self.item_id, self.run_id)]

    def run(self):
        print("run subjob")
        with self.output().open('w') as f:
            f.write('OK')

class NodepJob(luigi.Task):
    item_id = luigi.IntParameter()
    run_id = luigi.IntParameter()

    def output(self):
        return luigi.LocalTarget('/tmp/luigi-test/nodep-job-%s-%s.txt' % (self.item_id, self.run_id))

    def run(self):
        print("no dep job")
        with self.output().open('w') as f:
            f.write('OK')
$ PYTHONPATH='.' luigi --module map_publication RegisterFinish --local-scheduler --item-id 384 --run-id=1
DEBUG: Checking if RegisterFinish(item_id=384, run_id=1) is complete
DEBUG: Checking if RunSubJob(item_id=384, run_id=1) is complete
DEBUG: Checking if NodepJob(item_id=384, run_id=1) is complete
INFO: Informed scheduler that task   RegisterFinish_384_1_1b63d73cc2   has status   PENDING
INFO: Informed scheduler that task   NodepJob_384_1_1b63d73cc2   has status   PENDING
DEBUG: Checking if InitSubJob(item_id=384, run_id=1) is complete
INFO: Informed scheduler that task   RunSubJob_384_1_1b63d73cc2   has status   PENDING
INFO: Informed scheduler that task   InitSubJob_384_1_1b63d73cc2   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) running   NodepJob(item_id=384, run_id=1)
no dep job
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) done      NodepJob(item_id=384, run_id=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   NodepJob_384_1_1b63d73cc2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) running   InitSubJob(item_id=384, run_id=1)
init subjob
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) done      InitSubJob(item_id=384, run_id=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   InitSubJob_384_1_1b63d73cc2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) running   RunSubJob(item_id=384, run_id=1)
run subjob
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) done      RunSubJob(item_id=384, run_id=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   RunSubJob_384_1_1b63d73cc2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) running   RegisterFinish(item_id=384, run_id=1)
register finish
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) done      RegisterFinish(item_id=384, run_id=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   RegisterFinish_384_1_1b63d73cc2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 InitSubJob(item_id=384, run_id=1)
    - 1 NodepJob(item_id=384, run_id=1)
    - 1 RegisterFinish(item_id=384, run_id=1)
    - 1 RunSubJob(item_id=384, run_id=1)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment