Skip to content

Instantly share code, notes, and snippets.

@miku
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miku/9005312 to your computer and use it in GitHub Desktop.
Save miku/9005312 to your computer and use it in GitHub Desktop.
Using lftp inside a luigi task.
#
# It's not really elegant (I'm happy to hear about alternatives),
# but has done what it should so far.
#
# lftp takes care of the mirroring and this task is completed if a single file containing
# the paths to the mirrored file has been successfully written - so it's a bit indirect,
# but at least follows the one-file-per-task rule.
#
# It would be ok to run this task over and over, since after the first transfer the directories
# should be in sync.
#
class BMSSync(BMSTask):
""" Just mirror the XXX FTP server and create a single file
that contains the mirrored file paths, one per line. """
# always run this task
indicator = luigi.Parameter(default=random_string())
def requires(self):
# Executable checks, whether lftp is installed / in PATH
return Executable(name='lftp')
@timed
def run(self):
target = os.path.dirname(self.output().path)
# shellout = subprocess sugar
shellout(""" lftp -u {username},'{password}' -e
"set net:max-retries 1; set net:timeout 2;
mirror --verbose --only-newer -I "{pattern}"
{path} {target}; exit" {host}""", target=target,
host="ftp.example.com", user='user', password='password',
pattern='*.xml', path='/')
# remove any leftovers
for path in glob.glob(os.path.join(target, 'indicator-*')):
logger.debug("Removing previous indicator: %s" % path)
os.remove(path)
# since lftp will mirror into `target` (target=target), we create a file,
# that contains the mirrored filenames, one per line
with self.output().open('w') as output:
# findfiles = os.walk sugar
for path in findfiles(target):
# iter_tsv has been discussed on the ML before: http://goo.gl/ES4uZM - super useful
output.write_tsv(path)
def output(self):
# self.path() will point to some structured fs location, with the filename
# being a slugified `task_id`
return luigi.LocalTarget(path=self.path(), format=TSV)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment