class RegisterFinish(luigi.Task):
item_id = luigi.IntParameter()
run_id = luigi.IntParameter()
def requires(self):
return [RunSubJob(self.item_id, self.run_id), NodepJob(self.item_id, self.run_id)]
def output(self):
return luigi.LocalTarget('/tmp/luigi-test/register-finish-%s-%s.txt' % (self.item_id, self.run_id))
def run(self):
print("register finish")
with self.output().open('w') as f:
f.write('OK')
class InitSubJob(luigi.Task):
item_id = luigi.IntParameter()
run_id = luigi.IntParameter()
def output(self):
return luigi.LocalTarget('/tmp/luigi-test/init-sub-job-%s-%s.txt' % (self.item_id, self.run_id))
def run(self):
print("init subjob")
with self.output().open('w') as f:
f.write('OK')
class RunSubJob(luigi.Task):
item_id = luigi.IntParameter()
run_id = luigi.IntParameter()
def output(self):
return luigi.LocalTarget('/tmp/luigi-test/run-sub-job-%s-%s.txt' % (self.item_id, self.run_id))
def requires(self):
return [InitSubJob(self.item_id, self.run_id)]
def run(self):
print("run subjob")
with self.output().open('w') as f:
f.write('OK')
class NodepJob(luigi.Task):
item_id = luigi.IntParameter()
run_id = luigi.IntParameter()
def output(self):
return luigi.LocalTarget('/tmp/luigi-test/nodep-job-%s-%s.txt' % (self.item_id, self.run_id))
def run(self):
print("no dep job")
with self.output().open('w') as f:
f.write('OK')
$ PYTHONPATH='.' luigi --module map_publication RegisterFinish --local-scheduler --item-id 384 --run-id=1
DEBUG: Checking if RegisterFinish(item_id=384, run_id=1) is complete
DEBUG: Checking if RunSubJob(item_id=384, run_id=1) is complete
DEBUG: Checking if NodepJob(item_id=384, run_id=1) is complete
INFO: Informed scheduler that task RegisterFinish_384_1_1b63d73cc2 has status PENDING
INFO: Informed scheduler that task NodepJob_384_1_1b63d73cc2 has status PENDING
DEBUG: Checking if InitSubJob(item_id=384, run_id=1) is complete
INFO: Informed scheduler that task RunSubJob_384_1_1b63d73cc2 has status PENDING
INFO: Informed scheduler that task InitSubJob_384_1_1b63d73cc2 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) running NodepJob(item_id=384, run_id=1)
no dep job
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) done NodepJob(item_id=384, run_id=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task NodepJob_384_1_1b63d73cc2 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) running InitSubJob(item_id=384, run_id=1)
init subjob
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) done InitSubJob(item_id=384, run_id=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task InitSubJob_384_1_1b63d73cc2 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) running RunSubJob(item_id=384, run_id=1)
run subjob
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) done RunSubJob(item_id=384, run_id=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task RunSubJob_384_1_1b63d73cc2 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) running RegisterFinish(item_id=384, run_id=1)
register finish
INFO: [pid 44412] Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) done RegisterFinish(item_id=384, run_id=1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task RegisterFinish_384_1_1b63d73cc2 has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=515910807, workers=1, host=sulaco.localdomain, username=fields, pid=44412) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 ran successfully:
- 1 InitSubJob(item_id=384, run_id=1)
- 1 NodepJob(item_id=384, run_id=1)
- 1 RegisterFinish(item_id=384, run_id=1)
- 1 RunSubJob(item_id=384, run_id=1)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====```