Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
How to output the execution time of tasks in the luigi workflow system, as discussed [here](https://groups.google.com/d/msg/luigi-user/uivbf-luX9w/z0GCKKsIefoJ)
import luigi
import time
class TimeTaskMixin(object):
'''
A mixin that when added to a luigi task, will print out
the tasks execution time to standard out, when the task is
finished
'''
@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def print_execution_time(self, processing_time):
print('### PROCESSING TIME ###: ' + str(processing_time))
class TaskA(luigi.ExternalTask, TimeTaskMixin):
'''
A simple task representing a simple file with a few rows
of dummy content
'''
def output(self):
return luigi.LocalTarget('input.txt')
class TaskB(luigi.Task, TimeTaskMixin):
'''
A simple task that just outputs the content of its dependency
into a new file with the the same name plus the extension .taskb
'''
def requires(self):
return TaskA()
def output(self):
return luigi.LocalTarget(self.input().path + '.taskb')
def run(self):
time.sleep(1)
with self.input().open() as infile, self.output().open('w') as outfile:
for row in infile:
outfile.write(row)
class TaskC(luigi.Task, TimeTaskMixin):
'''
A simple task that just outputs the content of its dependency
into a new file with the the same name plus the extension .taskc
'''
def requires(self):
return TaskB()
def output(self):
return luigi.LocalTarget(self.input().path + '.taskc')
def run(self):
time.sleep(2)
with self.input().open() as infile, self.output().open('w') as outfile:
for row in infile:
outfile.write(row)
# If this file is executed as a script, run the last class in the dependency
# graph, TaskC.
if __name__ == '__main__':
luigi.run(main_task_cls=TaskC)
@samuell

This comment has been minimized.

Copy link
Owner Author

samuell commented Aug 26, 2014

This will produce output similar to the following:

[17:17:59] ~/code/test $ python luigi_time_tasks_example.py --local-scheduler
DEBUG: Checking if TaskC() is complete
INFO: Scheduled TaskC() (PENDING)
DEBUG: Checking if TaskB() is complete
INFO: Scheduled TaskB() (PENDING)
DEBUG: Checking if TaskA() is complete
INFO: Scheduled TaskA() (DONE)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 11034] Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) running   TaskB()
### PROCESSING TIME ###: 1.00197005272
INFO: [pid 11034] Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) done      TaskB()
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 11034] Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) running   TaskC()
### PROCESSING TIME ###: 2.00305199623
INFO: [pid 11034] Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) done      TaskC()
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) was stopped. Shutting down Keep-Alive thread
[17:18:03] ~/code/test $ 
@freider

This comment has been minimized.

Copy link

freider commented Aug 26, 2014

If I'm not mistaken, the @luigi.Task.event_handler(luigi.Event.PROCESSING_TIME) will actually add the processing time callback to all luigi tasks, not just the ones that inherit the mixin. That is if the processing time event doesn't have some magic that other events don't have... The event_handler registrator adds it globally to all tasks on or below in class hierarchy from the one you call it on, so in this case that would be all luigi Tasks since it's called on Task.

@jkryanchou

This comment has been minimized.

Copy link

jkryanchou commented Jun 22, 2016

awesome code snippets! 👍

@jamesmcm

This comment has been minimized.

Copy link

jamesmcm commented Oct 16, 2017

Still useful years later! 😄

@tobiasfielitz

This comment has been minimized.

Copy link

tobiasfielitz commented Apr 25, 2020

This is great! Any ideas how to get this into the UI?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.