Skip to content

Instantly share code, notes, and snippets.

@freider
Created August 23, 2013 08:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save freider/6317023 to your computer and use it in GitHub Desktop.
Save freider/6317023 to your computer and use it in GitHub Desktop.
Example on how to create custom `Format`s that are portable between different FileSystemTargets
import functools
from luigi import LocalTarget
from luigi.format import Format
class PipeWrapper(object):
"""Wrap pipe in a "real" Python object in case it's a `file`
"""
# TODO: build this into luigi and use it by default for LocalFile.open('r')
def __init__(self, pipe):
self._subpipe = pipe
def __getattr__(self, name):
# forward calls to 'write', 'close' and other methods not defined below
return getattr(self._subpipe, name)
def __enter__(self, *args, **kwargs):
return self # this might be bad...
def __exit__(self, *args, **kwargs):
return self._subpipe.__exit__(*args, **kwargs)
def __iter__(self):
return iter(self._subpipe)
def write_tsv(output_stream, tup):
output_stream.write('\t'.join(tup) + '\n')
def iter_tsv(input_stream):
for line in input_stream:
yield tuple(line.rstrip('\n').split('\t'))
class TSVFormat(Format):
def pipe_writer(self, output_pipe):
output_pipe.write_tsv = functools.partial(write_tsv, output_pipe)
return output_pipe
def pipe_reader(self, input_pipe):
input_pipe = PipeWrapper(input_pipe)
input_pipe.iter_tsv = functools.partial(iter_tsv, input_pipe)
return input_pipe
t = LocalTarget("/tmp/test.tsv", format=TSVFormat())
with t.open('w') as f:
f.write_tsv(("foo", "bar"))
f.write_tsv(("wtf", "bbq"))
with t.open('r') as f:
for tup in f.iter_tsv():
print tup
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment