Skip to content

Instantly share code, notes, and snippets.

@alexshires
Created October 29, 2015 11:46
Show Gist options
  • Save alexshires/86b462813d3c9d742422 to your computer and use it in GitHub Desktop.
Save alexshires/86b462813d3c9d742422 to your computer and use it in GitHub Desktop.
luigi doesn't check for local file output
import os
import luigi
class FileExists(luigi.ExternalTask):
filename = luigi.Parameter(default=None)
def output(self):
print "FE: output", self.filename
return luigi.LocalTarget(self.filename)
def create_new_filename(filename, ext):
splits = filename.split(".")
a = "".join(splits[:-1]) + ext + "." + splits[-1]
return a
class TaskOne(luigi.Task):
filename1 = luigi.Parameter(default=None)
filedir = luigi.Parameter(default=".")
"""takes filename, appends '_2' to it and writes it back out"""
def requires(self):
print "TO: requires", self.filename1
return FileExists(os.path.join(self.filedir, self.filename1))
def run(self):
print "TO: running"
cmd = "cp %s %s" % (self.input().fn, self.output().fn)
print "command", cmd
#r = os.system(cmd)
f = open(create_new_filename(self.filename1, "_3"), "w")
print "TO: third file", f
f.write("test")
f.close()
def output(self):
self.filename2 = os.path.join(self.filedir, create_new_filename(self.filename1, "_2"))
print "TO: output", self.filename2
return luigi.LocalTarget(self.filename2)
if __name__ == '__main__':
os.system("touch start.test")
f = open("testfile.txt", 'w')
f.write("test file, please ignore\n")
f.close()
luigi.run()
@alexshires
Copy link
Author

run with:

python test_processing.py TaskOne --filename testfile.txt --local-scheduler
DEBUG: Checking if TaskOne(filename1=testfile.txt, filedir=.) is complete
TO: output ./testfile_2.txt
TO: requires testfile.txt
DEBUG: Checking if FileExists(filename=./testfile.txt) is complete
FE: output ./testfile.txt
INFO: Informed scheduler that task TaskOne(filename1=testfile.txt, filedir=.) has status PENDING
INFO: Informed scheduler that task FileExists(filename=./testfile.txt) has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 59352] Worker Worker(salt=342749591, workers=1, host=ip-10-110-19-25, username=ashires, pid=59352) running TaskOne(filename1=testfile.txt, filedir=.)
TO: requires testfile.txt
FE: output ./testfile.txt
TO: running
TO: requires testfile.txt
FE: output ./testfile.txt
TO: output ./testfile_2.txt
command cp ./testfile.txt ./testfile_2.txt
TO: third file <open file 'testfile_3.txt', mode 'w' at 0x7fc388c97300>
INFO: [pid 59352] Worker Worker(salt=342749591, workers=1, host=ip-10-110-19-25, username=ashires, pid=59352) done TaskOne(filename1=testfile.txt, filedir=.)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task TaskOne(filename1=testfile.txt, filedir=.) has status DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=342749591, workers=1, host=ip-10-110-19-25, username=ashires, pid=59352) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:

  • 1 present dependencies were encountered:
    • 1 FileExists(filename=./testfile.txt)
  • 1 ran successfully:
    • 1 TaskOne(filename1=testfile.txt, filedir=.)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

Why doesn't it check for the output???

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment