Created
October 29, 2015 11:46
-
-
Save alexshires/86b462813d3c9d742422 to your computer and use it in GitHub Desktop.
luigi doesn't check for local file output
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import luigi | |
class FileExists(luigi.ExternalTask): | |
filename = luigi.Parameter(default=None) | |
def output(self): | |
print "FE: output", self.filename | |
return luigi.LocalTarget(self.filename) | |
def create_new_filename(filename, ext): | |
splits = filename.split(".") | |
a = "".join(splits[:-1]) + ext + "." + splits[-1] | |
return a | |
class TaskOne(luigi.Task): | |
filename1 = luigi.Parameter(default=None) | |
filedir = luigi.Parameter(default=".") | |
"""takes filename, appends '_2' to it and writes it back out""" | |
def requires(self): | |
print "TO: requires", self.filename1 | |
return FileExists(os.path.join(self.filedir, self.filename1)) | |
def run(self): | |
print "TO: running" | |
cmd = "cp %s %s" % (self.input().fn, self.output().fn) | |
print "command", cmd | |
#r = os.system(cmd) | |
f = open(create_new_filename(self.filename1, "_3"), "w") | |
print "TO: third file", f | |
f.write("test") | |
f.close() | |
def output(self): | |
self.filename2 = os.path.join(self.filedir, create_new_filename(self.filename1, "_2")) | |
print "TO: output", self.filename2 | |
return luigi.LocalTarget(self.filename2) | |
if __name__ == '__main__': | |
os.system("touch start.test") | |
f = open("testfile.txt", 'w') | |
f.write("test file, please ignore\n") | |
f.close() | |
luigi.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
run with:
python test_processing.py TaskOne --filename testfile.txt --local-scheduler
DEBUG: Checking if TaskOne(filename1=testfile.txt, filedir=.) is complete
TO: output ./testfile_2.txt
TO: requires testfile.txt
DEBUG: Checking if FileExists(filename=./testfile.txt) is complete
FE: output ./testfile.txt
INFO: Informed scheduler that task TaskOne(filename1=testfile.txt, filedir=.) has status PENDING
INFO: Informed scheduler that task FileExists(filename=./testfile.txt) has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 59352] Worker Worker(salt=342749591, workers=1, host=ip-10-110-19-25, username=ashires, pid=59352) running TaskOne(filename1=testfile.txt, filedir=.)
TO: requires testfile.txt
FE: output ./testfile.txt
TO: running
TO: requires testfile.txt
FE: output ./testfile.txt
TO: output ./testfile_2.txt
command cp ./testfile.txt ./testfile_2.txt
TO: third file <open file 'testfile_3.txt', mode 'w' at 0x7fc388c97300>
INFO: [pid 59352] Worker Worker(salt=342749591, workers=1, host=ip-10-110-19-25, username=ashires, pid=59352) done TaskOne(filename1=testfile.txt, filedir=.)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task TaskOne(filename1=testfile.txt, filedir=.) has status DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=342749591, workers=1, host=ip-10-110-19-25, username=ashires, pid=59352) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
Why doesn't it check for the output???