Last active
August 29, 2015 13:57
-
-
Save samuell/9625102 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'''Luigi Workflow | |
Usage: | |
run_luigi_workflow.py (--host=|--local-scheduler) --workers=<workers> run <taskname> -p <partitioning_parameter> | |
run_luigi_workflow.py (-h|--help) | |
run_luigi_workflow.py --version | |
Options: | |
-h, --help Show this screen | |
--version Show version | |
--host Host name to a central planner | |
--local-scheduler Use localhost instead | |
-p <partitioning_parameter> A partitioning parameter for splitting work into jobs. | |
--workers <workers> The number of simultaneous Luigi workers to run | |
Available options for : | |
''' | |
# These are a listing of available task names, for use in both the | |
# documentation and in the code further on below. | |
tasknames = ["some_task_a", | |
"some_task_b", | |
"some_task_c"] | |
# Append the list of task names into the main docopt definition text | |
# (which serves as both the documentation screen, as well as definition | |
# of command line arguments). | |
__doc__ += " " + "\n ".join(tasknames) | |
# Some needed imports | |
from docopt import docopt | |
import luigi | |
import luigi.scheduler | |
import luigi.worker | |
import os | |
import sys | |
import logging as log | |
import socket | |
import csv | |
from datetime import datetime as dt | |
# Here we are importing our own tasks, provided they are | |
# arranged in a python module (folder) named "components" | |
from components.SomeTaskA import SomeTaskA | |
from components.SomeTaskB import SomeTaskB | |
from components.SomeTaskC import SomeTaskC | |
# The main function that is executed when this file is the entry point of | |
# execution. | |
def run(): | |
# Read the combined documentation / parameter definition | |
args = docopt(__doc__, version="Luigi Workflow 0.9") | |
# If the run command is specified, run some real commands | |
if args['run'] == True: | |
# Configure some logging | |
log.basicConfig(level=log.INFO, | |
filename="log/luigi_workflow_{host}_{date}_{time}.log".format( | |
host=socket.gethostname().split(".")[0], # Hack to get current hostname | |
date=dt.now().strftime("%Y%m%d"), | |
time=dt.now().strftime("%H%M%S")), | |
format="%(asctime)s %(name)-15s %(levelname)-8s %(message)s") | |
# ------------------------------------------ | |
# DEFINE THE MAIN WORKFLOW DEPENDENCY GRAPH | |
# ------------------------------------------ | |
taskname_to_run = args['<taskname>'] | |
# Make sure we have a valid task name | |
if taskname_to_run not in tasknames: | |
print("available tasks for 'run':") | |
for taskname in tasknames: | |
print " "*4 + taskname | |
sys.exit() | |
# Take special care of the "all" keyword, for the partitioning parameter | |
# ... reading in a list of parameters from a tab separated file. | |
if args["--partitioning-parameter"] == "all": | |
with open("data/partitioning_parameters.tsv") as infile: | |
partitioning_parameters = [] | |
tsv_reader = csv.reader(infile, delimiter="\t") | |
for line in tsv_reader: | |
partitioning_parameters.append(line[0]) | |
else: | |
partitioning_parameters = [args["--partitioning-parameter"]] | |
tasks_to_run = [] | |
# Loop over partitioning parameters (is usually a single one, but can | |
# be many, if using the "all" keyword). | |
for partitioning_parameter in partitioning_parameters: | |
tasks = {} | |
tasks["some_task_a"] = SomeTaskA( | |
partitioning_parameter=partitioning_parameter) | |
tasks["some_task_b"] = SomeTaskB( | |
upstream_task=tasks["some_task_a"], | |
partitioning_parameter=partitioning_parameter) | |
tasks["some_task_c"] = SomeTaskC( | |
upstream_task=tasks["some_task_b"], | |
partitioning_parameter=partitioning_parameter) | |
# From the workflow defined above, add the task to be run up to, specified on the commandline | |
tasks_to_run.append(tasks[args["<taskname>"]]) | |
# -------------------------------------------------------------------------------- | |
# RUN THE LUIGI WORKFLOW FOR THE CURRENTLY COLLECTED TASKS | |
# -------------------------------------------------------------------------------- | |
# Get number of luigi workers to run from the command line. | |
workers_cnt = args['--workers'] | |
# Decide to run either against an external Luigid daemon host, or locally | |
# ... and actually run the workflow (the build() function does both. | |
if args['--host']: | |
luigi.build(tasks_to_run, workers=workers_cnt, scheduler_host=args['--host']) | |
else: | |
luigi.build(tasks_to_run, workers=workers_cnt, local_scheduler=True) | |
# -------------------------------------------------------------------------------- | |
# If this file is the entry point of execution, execute the run() function. | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment