Last active
June 22, 2018 11:55
-
-
Save miku/5934516 to your computer and use it in GitHub Desktop.
FTP + luigi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def which(program): | |
""" return `None` if no executable can be found | |
""" | |
def is_exe(fpath): | |
return os.path.isfile(fpath) and os.access(fpath, os.X_OK) | |
fpath, fname = os.path.split(program) | |
if fpath: | |
if is_exe(program): | |
return program | |
else: | |
for path in os.environ["PATH"].split(os.pathsep): | |
path = path.strip('"') | |
exe_file = os.path.join(path, program) | |
if is_exe(exe_file): | |
return exe_file | |
return None | |
class Executable(luigi.Task): | |
""" Checks, whether an external executable is available. | |
This task returns `None` as output, so if this task is | |
used make sure you check your input. | |
""" | |
name = luigi.Parameter() | |
def run(self): | |
""" Just complain explicitly about missing program. | |
""" | |
if not which(self.name): | |
raise Exception('external program %s required' % self.name) | |
def complete(self): | |
return which(self.name) is not None | |
def output(self): | |
# this would be another question in it's own; | |
# how to handle 'showstopper' tasks like this, that do not | |
# produce any output? -- the default output is a [] | |
# cf. https://github.com/spotify/luigi/blob/master/luigi/task.py#L281 | |
return None | |
class FTPMirror(luigi.Task): | |
""" A generic FTP directory sync. | |
""" | |
target = luigi.Parameter(description='the target directory') | |
host = luigi.Parameter() | |
username = luigi.Parameter() | |
password = luigi.Parameter() | |
pattern = luigi.Parameter(description="e.g. '*.zip'") | |
def requires(self): | |
return Executable(name='lftp') | |
def run(self): | |
if not os.path.exists(self.target): | |
os.makedirs(self.target) | |
command = """ lftp -u %s,%s | |
-e "set net:max-retries 1; set net:timeout 2; | |
mirror --verbose --only-newer -I '%s' / %s; exit" %s """ % ( | |
self.username, self.password, self.pattern, | |
self.target, self.host) | |
command = re.sub('[ \t\n]+', ' ', command) | |
code = subprocess.call(command, shell=True) | |
def output(self): | |
""" What to implement here? | |
Options: | |
* a single LocalTarget containing all filenames of the target directory | |
* a list of LocalTargets with the filenames | |
* a list of filenames | |
* None; since idempotency is handled by `lftp` anyway? | |
""" | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment