Skip to content

Instantly share code, notes, and snippets.

Last active July 22, 2019 15:02
  • Star 6 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
CSV processor examples for luigi. Can serialize *args to CSV. Can deserialize CSV rows into namedtuples if requested. -- "works on my machine".
from luigi.format import Format
import csvkit
class CSVOutputProcessor(object):
A simple CSV output processor to be hooked into Format's
If `cols` are given, the names are used as CSV header, otherwise no
explicit header is written.
def __init__(self, output_pipe, delimiter='\t', encoding='utf-8', cols=None):
self.writer = csvkit.CSVKitWriter(output_pipe, delimiter=delimiter,
self.output_pipe = output_pipe
self.cols = cols
if cols:
# write header
def write(self, *args, **kwargs):
if self.cols and not len(args) == len(self.cols):
raise RuntimeError('This format expects %s columns: %s, got %s.' % (
(len(self.cols), self.cols, len(args))))
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if type:
# pass on the exception; it get's handled in `atomic_file`:
return False
class CSVInputProcessor(object):
A simple CSV output processor to be hooked into Format's
If `cols` are given, this processor will turn each row in the file into
a `namedtuple` with the attributes given in cols. Otherwise just return
the row as tuple.
def __init__(self, input_pipe, encoding='utf-8', delimiter='\t', cols=None):
self.reader = csvkit.CSVKitReader(input_pipe, encoding=encoding,
self.input_pipe = input_pipe
self.cols = cols
if cols:
header = # consume the header
if not tuple(header) == tuple(cols):
raise RuntimeError('Format mismatch, expected: %s, but got: %s' %
(cols, header))
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
def __iter__(self):
if self.cols:
Row = collections.namedtuple('Row', self.cols)
for line in self.reader:
yield Row(*line)
for line in self.reader:
yield tuple(line)
class CSVFormat(Format):
Helper for reading from and writing to CSV files.
Explicit column names can be given via `cols`. The headers are written
to the file or read and checked against the actual header row. If `cols`
are `None` no header is written and the reader will just return a tuple.
def __init__(self, delimiter='\t', encoding='utf-8', cols=None):
self.delimiter = delimiter
self.encoding = encoding
self.cols = cols
if cols:
if not len(cols) == len(set(cols)):
raise ValueError('Column names must be uniq.')
if not isinstance(cols, (list, tuple)):
raise ValueError('Column names (headers) must be given as '
'list or tuple.')
def pipe_reader(self, input_pipe):
return CSVInputProcessor(input_pipe, delimiter=self.delimiter,
def pipe_writer(self, output_pipe):
return CSVOutputProcessor(output_pipe, delimiter=self.delimiter,
# CSV format needs to be instantiated (in contrast to, e.g. Gzip) and takes
# kwargs for `delimiter`, `encoding` and column names as `cols`.
# Example: plain CSV
CSV = CSVFormat()
# Example: plain TSV without headers
TSV = CSVFormat(delimiter='\t')
# Example: TSV with column names, delimiter and encoding
Custom = CSVFormat(delimiter=':', encoding='latin-1',
cols=('name', 'state', 'zipcode'))
class CSVWriterExample(luigi.Task):
Example task writing data as TSV.
Will write the following to a file:
$ cat example.tsv
name state zipcode
John CA 92101
Jane NY 10304
Jazz TX 78701
def run(self):
with self.output().open('w') as output:
output.write('John', 'CA', 92101)
output.write('Jane', 'NY', 10304)
output.write('Jazz', 'TX', 78701)
# would yield an error, since explicit column names were given
# and those are expected to be present
# output.write('Hello', 'world')
def output(self):
return luigi.LocalTarget('./example.tsv',
cols=('name', 'state', 'zipcode')))
class CSVReaderExample(luigi.Task):
""" Example task reading data from local file with given CSV format.
def requires(self):
return CSVWriterExample()
def run(self):
with self.input().open() as handle:
for row in handle:
# make use of the namedtuple ...
print('%s from %s %s' % (, row.zipcode, row.state))
# ... or just the tuple
print('Hi %s' % row[0])
def complete(self):
return False
Copy link

Nice piece of code. Have you thought about contributing it to luigi?
I am adding a few features, like:

  • being able to return a dict and not just a namedtuple in the input processor.
  • type checking after input line parsing
  • support for sending a tuple to output.write()

Copy link

a-leut commented Jun 7, 2016

I like it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment