Skip to content

Instantly share code, notes, and snippets.

@samuell
Last active June 14, 2022 19:32
Show Gist options
  • Save samuell/93cc7eb6803fa2790042 to your computer and use it in GitHub Desktop.
Save samuell/93cc7eb6803fa2790042 to your computer and use it in GitHub Desktop.
How to output the execution time of tasks in the luigi workflow system, as discussed [here](https://groups.google.com/d/msg/luigi-user/uivbf-luX9w/z0GCKKsIefoJ)
import luigi
import time
class TimeTaskMixin(object):
'''
A mixin that when added to a luigi task, will print out
the tasks execution time to standard out, when the task is
finished
'''
@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def print_execution_time(self, processing_time):
print('### PROCESSING TIME ###: ' + str(processing_time))
class TaskA(luigi.ExternalTask, TimeTaskMixin):
'''
A simple task representing a simple file with a few rows
of dummy content
'''
def output(self):
return luigi.LocalTarget('input.txt')
class TaskB(luigi.Task, TimeTaskMixin):
'''
A simple task that just outputs the content of its dependency
into a new file with the the same name plus the extension .taskb
'''
def requires(self):
return TaskA()
def output(self):
return luigi.LocalTarget(self.input().path + '.taskb')
def run(self):
time.sleep(1)
with self.input().open() as infile, self.output().open('w') as outfile:
for row in infile:
outfile.write(row)
class TaskC(luigi.Task, TimeTaskMixin):
'''
A simple task that just outputs the content of its dependency
into a new file with the the same name plus the extension .taskc
'''
def requires(self):
return TaskB()
def output(self):
return luigi.LocalTarget(self.input().path + '.taskc')
def run(self):
time.sleep(2)
with self.input().open() as infile, self.output().open('w') as outfile:
for row in infile:
outfile.write(row)
# If this file is executed as a script, run the last class in the dependency
# graph, TaskC.
if __name__ == '__main__':
luigi.run(main_task_cls=TaskC)
@samuell
Copy link
Author

samuell commented Aug 26, 2014

This will produce output similar to the following:

[17:17:59] ~/code/test $ python luigi_time_tasks_example.py --local-scheduler
DEBUG: Checking if TaskC() is complete
INFO: Scheduled TaskC() (PENDING)
DEBUG: Checking if TaskB() is complete
INFO: Scheduled TaskB() (PENDING)
DEBUG: Checking if TaskA() is complete
INFO: Scheduled TaskA() (DONE)
INFO: Done scheduling tasks
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 11034] Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) running   TaskB()
### PROCESSING TIME ###: 1.00197005272
INFO: [pid 11034] Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) done      TaskB()
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 11034] Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) running   TaskC()
### PROCESSING TIME ###: 2.00305199623
INFO: [pid 11034] Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) done      TaskC()
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=**********, host=samuel-e743, username=samuel, pid=11034) was stopped. Shutting down Keep-Alive thread
[17:18:03] ~/code/test $ 

@freider
Copy link

freider commented Aug 26, 2014

If I'm not mistaken, the @luigi.Task.event_handler(luigi.Event.PROCESSING_TIME) will actually add the processing time callback to all luigi tasks, not just the ones that inherit the mixin. That is if the processing time event doesn't have some magic that other events don't have... The event_handler registrator adds it globally to all tasks on or below in class hierarchy from the one you call it on, so in this case that would be all luigi Tasks since it's called on Task.

@jkryanchou
Copy link

jkryanchou commented Jun 22, 2016

awesome code snippets! 👍

@jamesmcm
Copy link

Still useful years later! 😄

@tf42src
Copy link

tf42src commented Apr 25, 2020

This is great! Any ideas how to get this into the UI?

@Melchizedek13
Copy link

It's excellent snippets! Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment