Skip to content

Instantly share code, notes, and snippets.

@samuell
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samuell/9625102 to your computer and use it in GitHub Desktop.
Save samuell/9625102 to your computer and use it in GitHub Desktop.
'''Luigi Workflow
Usage:
run_luigi_workflow.py (--host=|--local-scheduler) --workers=<workers> run <taskname> -p <partitioning_parameter>
run_luigi_workflow.py (-h|--help)
run_luigi_workflow.py --version
Options:
-h, --help Show this screen
--version Show version
--host Host name to a central planner
--local-scheduler Use localhost instead
-p <partitioning_parameter> A partitioning parameter for splitting work into jobs.
--workers <workers> The number of simultaneous Luigi workers to run
Available options for :
'''
# These are a listing of available task names, for use in both the
# documentation and in the code further on below.
tasknames = ["some_task_a",
"some_task_b",
"some_task_c"]
# Append the list of task names into the main docopt definition text
# (which serves as both the documentation screen, as well as definition
# of command line arguments).
__doc__ += " " + "\n ".join(tasknames)
# Some needed imports
from docopt import docopt
import luigi
import luigi.scheduler
import luigi.worker
import os
import sys
import logging as log
import socket
import csv
from datetime import datetime as dt
# Here we are importing our own tasks, provided they are
# arranged in a python module (folder) named "components"
from components.SomeTaskA import SomeTaskA
from components.SomeTaskB import SomeTaskB
from components.SomeTaskC import SomeTaskC
# The main function that is executed when this file is the entry point of
# execution.
def run():
# Read the combined documentation / parameter definition
args = docopt(__doc__, version="Luigi Workflow 0.9")
# If the run command is specified, run some real commands
if args['run'] == True:
# Configure some logging
log.basicConfig(level=log.INFO,
filename="log/luigi_workflow_{host}_{date}_{time}.log".format(
host=socket.gethostname().split(".")[0], # Hack to get current hostname
date=dt.now().strftime("%Y%m%d"),
time=dt.now().strftime("%H%M%S")),
format="%(asctime)s %(name)-15s %(levelname)-8s %(message)s")
# ------------------------------------------
# DEFINE THE MAIN WORKFLOW DEPENDENCY GRAPH
# ------------------------------------------
taskname_to_run = args['<taskname>']
# Make sure we have a valid task name
if taskname_to_run not in tasknames:
print("available tasks for 'run':")
for taskname in tasknames:
print " "*4 + taskname
sys.exit()
# Take special care of the "all" keyword, for the partitioning parameter
# ... reading in a list of parameters from a tab separated file.
if args["--partitioning-parameter"] == "all":
with open("data/partitioning_parameters.tsv") as infile:
partitioning_parameters = []
tsv_reader = csv.reader(infile, delimiter="\t")
for line in tsv_reader:
partitioning_parameters.append(line[0])
else:
partitioning_parameters = [args["--partitioning-parameter"]]
tasks_to_run = []
# Loop over partitioning parameters (is usually a single one, but can
# be many, if using the "all" keyword).
for partitioning_parameter in partitioning_parameters:
tasks = {}
tasks["some_task_a"] = SomeTaskA(
partitioning_parameter=partitioning_parameter)
tasks["some_task_b"] = SomeTaskB(
upstream_task=tasks["some_task_a"],
partitioning_parameter=partitioning_parameter)
tasks["some_task_c"] = SomeTaskC(
upstream_task=tasks["some_task_b"],
partitioning_parameter=partitioning_parameter)
# From the workflow defined above, add the task to be run up to, specified on the commandline
tasks_to_run.append(tasks[args["<taskname>"]])
# --------------------------------------------------------------------------------
# RUN THE LUIGI WORKFLOW FOR THE CURRENTLY COLLECTED TASKS
# --------------------------------------------------------------------------------
# Get number of luigi workers to run from the command line.
workers_cnt = args['--workers']
# Decide to run either against an external Luigid daemon host, or locally
# ... and actually run the workflow (the build() function does both.
if args['--host']:
luigi.build(tasks_to_run, workers=workers_cnt, scheduler_host=args['--host'])
else:
luigi.build(tasks_to_run, workers=workers_cnt, local_scheduler=True)
# --------------------------------------------------------------------------------
# If this file is the entry point of execution, execute the run() function.
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment