Last active
August 29, 2015 13:56
-
-
Save miku/9005312 to your computer and use it in GitHub Desktop.
Using lftp inside a luigi task.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# It's not really elegant (I'm happy to hear about alternatives), | |
# but has done what it should so far. | |
# | |
# lftp takes care of the mirroring and this task is completed if a single file containing | |
# the paths to the mirrored file has been successfully written - so it's a bit indirect, | |
# but at least follows the one-file-per-task rule. | |
# | |
# It would be ok to run this task over and over, since after the first transfer the directories | |
# should be in sync. | |
# | |
class BMSSync(BMSTask): | |
""" Just mirror the XXX FTP server and create a single file | |
that contains the mirrored file paths, one per line. """ | |
# always run this task | |
indicator = luigi.Parameter(default=random_string()) | |
def requires(self): | |
# Executable checks, whether lftp is installed / in PATH | |
return Executable(name='lftp') | |
@timed | |
def run(self): | |
target = os.path.dirname(self.output().path) | |
# shellout = subprocess sugar | |
shellout(""" lftp -u {username},'{password}' -e | |
"set net:max-retries 1; set net:timeout 2; | |
mirror --verbose --only-newer -I "{pattern}" | |
{path} {target}; exit" {host}""", target=target, | |
host="ftp.example.com", user='user', password='password', | |
pattern='*.xml', path='/') | |
# remove any leftovers | |
for path in glob.glob(os.path.join(target, 'indicator-*')): | |
logger.debug("Removing previous indicator: %s" % path) | |
os.remove(path) | |
# since lftp will mirror into `target` (target=target), we create a file, | |
# that contains the mirrored filenames, one per line | |
with self.output().open('w') as output: | |
# findfiles = os.walk sugar | |
for path in findfiles(target): | |
# iter_tsv has been discussed on the ML before: http://goo.gl/ES4uZM - super useful | |
output.write_tsv(path) | |
def output(self): | |
# self.path() will point to some structured fs location, with the filename | |
# being a slugified `task_id` | |
return luigi.LocalTarget(path=self.path(), format=TSV) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment