Skip to content

Instantly share code, notes, and snippets.

@samuell
Last active June 14, 2022 19:32
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save samuell/93cc7eb6803fa2790042 to your computer and use it in GitHub Desktop.
Save samuell/93cc7eb6803fa2790042 to your computer and use it in GitHub Desktop.
How to output the execution time of tasks in the luigi workflow system, as discussed [here](https://groups.google.com/d/msg/luigi-user/uivbf-luX9w/z0GCKKsIefoJ)
import luigi
import time
class TimeTaskMixin(object):
'''
A mixin that when added to a luigi task, will print out
the tasks execution time to standard out, when the task is
finished
'''
@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def print_execution_time(self, processing_time):
print('### PROCESSING TIME ###: ' + str(processing_time))
class TaskA(luigi.ExternalTask, TimeTaskMixin):
'''
A simple task representing a simple file with a few rows
of dummy content
'''
def output(self):
return luigi.LocalTarget('input.txt')
class TaskB(luigi.Task, TimeTaskMixin):
'''
A simple task that just outputs the content of its dependency
into a new file with the the same name plus the extension .taskb
'''
def requires(self):
return TaskA()
def output(self):
return luigi.LocalTarget(self.input().path + '.taskb')
def run(self):
time.sleep(1)
with self.input().open() as infile, self.output().open('w') as outfile:
for row in infile:
outfile.write(row)
class TaskC(luigi.Task, TimeTaskMixin):
'''
A simple task that just outputs the content of its dependency
into a new file with the the same name plus the extension .taskc
'''
def requires(self):
return TaskB()
def output(self):
return luigi.LocalTarget(self.input().path + '.taskc')
def run(self):
time.sleep(2)
with self.input().open() as infile, self.output().open('w') as outfile:
for row in infile:
outfile.write(row)
# If this file is executed as a script, run the last class in the dependency
# graph, TaskC.
if __name__ == '__main__':
luigi.run(main_task_cls=TaskC)
@freider
Copy link

freider commented Aug 26, 2014

If I'm not mistaken, the @luigi.Task.event_handler(luigi.Event.PROCESSING_TIME) will actually add the processing time callback to all luigi tasks, not just the ones that inherit the mixin. That is if the processing time event doesn't have some magic that other events don't have... The event_handler registrator adds it globally to all tasks on or below in class hierarchy from the one you call it on, so in this case that would be all luigi Tasks since it's called on Task.

@jkryanchou
Copy link

jkryanchou commented Jun 22, 2016

awesome code snippets! 👍

@jamesmcm
Copy link

Still useful years later! 😄

@tf42src
Copy link

tf42src commented Apr 25, 2020

This is great! Any ideas how to get this into the UI?

@Melchizedek13
Copy link

It's excellent snippets! Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment