Skip to content

Instantly share code, notes, and snippets.

@visualskyrim
Created December 28, 2017 06:09
Show Gist options
  • Save visualskyrim/a40d39d392221f9fb4ba6a291bb18db8 to your computer and use it in GitHub Desktop.
Save visualskyrim/a40d39d392221f9fb4ba6a291bb18db8 to your computer and use it in GitHub Desktop.
A luigi task target deciding if a task is done or not by checking the _SUCCESS file in the given directory
class HdfsJobTarget(HdfsTarget):
def exists(self):
return self.fs.exists(os.path.join(self.path, '_SUCCESS'))
def copy(self, destination):
target_dir = os.path.dirname(destination)
if not self.fs.exists(target_dir):
self.fs.mkdir(target_dir)
self.fs.copy(self.path, target_dir)
def touch(self, touch_empty_part_file=True, touch_success_file=True):
empty_part_file = HdfsTarget(os.path.join(self.path, 'part-00000'))
if touch_empty_part_file and not empty_part_file.exists():
empty_part_file.open(mode='w').close()
success_file = HdfsTarget(os.path.join(self.path, '_SUCCESS'))
if touch_success_file and not success_file.exists():
success_file.open(mode='w').close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment