Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Last active December 4, 2019 19:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gxercavins/ed08b877aa129413f9d35b52d96e01cc to your computer and use it in GitHub Desktop.
Save gxercavins/ed08b877aa129413f9d35b52d96e01cc to your computer and use it in GitHub Desktop.
SO question 59182161
import argparse, logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
class ParseHeadersFn(beam.DoFn):
"""ParDo to output only the headers"""
def process(self, element):
if '--' in element.split(',')[0]:
yield [','.join(element.split(',')[1:])]
class ParseRowsFn(beam.DoFn):
"""ParDo to process data rows according to header metadata"""
def process(self, element, headers):
# changing ids as per https://stackoverflow.com/a/28242076/6121516
fields = element.split(',')
if '--' not in fields[0]:
ids = headers[0][0].split(',')
labels = headers[1][0].split(',')
id_changes = [i for i in range(1,len(ids)) if ids[i]!=ids[i-1]]
id_changes.append(len(ids))
for idx, change in enumerate(id_changes):
row = {'timestamp': fields[0], 'id': ids[change - 1]}
low = max(idx - 1, 0)
row.update(dict(zip(labels[low:change], fields[low+1:change+1])))
print row
yield [row]
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
input = p | 'Read CSV file' >> ReadFromText("input.csv")
headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn())
rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers))
p.run()
if __name__ == '__main__':
run()
import argparse, logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
class ParseHeadersFn(beam.DoFn):
"""ParDo to output only the headers"""
def process(self, element):
if '--' in element.split(',')[0]:
yield [','.join(element.split(',')[1:])]
class ParseRowsFn(beam.DoFn):
"""ParDo to process data rows according to header metadata"""
def process(self, element, headers):
if 'time1' in element.split(',')[0]:
for id in headers[0]:
print 'ids: ' + id
for label in headers[1]:
print 'labels: ' + label
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
input = p | 'Read CSV file' >> ReadFromText("input.csv")
headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn())
rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers))
p.run()
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment