Last active
December 4, 2019 19:47
-
-
Save gxercavins/ed08b877aa129413f9d35b52d96e01cc to your computer and use it in GitHub Desktop.
SO question 59182161
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse, logging | |
import apache_beam as beam | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from apache_beam.io import ReadFromText | |
class ParseHeadersFn(beam.DoFn): | |
"""ParDo to output only the headers""" | |
def process(self, element): | |
if '--' in element.split(',')[0]: | |
yield [','.join(element.split(',')[1:])] | |
class ParseRowsFn(beam.DoFn): | |
"""ParDo to process data rows according to header metadata""" | |
def process(self, element, headers): | |
# changing ids as per https://stackoverflow.com/a/28242076/6121516 | |
fields = element.split(',') | |
if '--' not in fields[0]: | |
ids = headers[0][0].split(',') | |
labels = headers[1][0].split(',') | |
id_changes = [i for i in range(1,len(ids)) if ids[i]!=ids[i-1]] | |
id_changes.append(len(ids)) | |
for idx, change in enumerate(id_changes): | |
row = {'timestamp': fields[0], 'id': ids[change - 1]} | |
low = max(idx - 1, 0) | |
row.update(dict(zip(labels[low:change], fields[low+1:change+1]))) | |
print row | |
yield [row] | |
def run(argv=None): | |
parser = argparse.ArgumentParser() | |
known_args, pipeline_args = parser.parse_known_args(argv) | |
p = beam.Pipeline(options=PipelineOptions(pipeline_args)) | |
input = p | 'Read CSV file' >> ReadFromText("input.csv") | |
headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn()) | |
rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers)) | |
p.run() | |
if __name__ == '__main__': | |
run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse, logging | |
import apache_beam as beam | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from apache_beam.io import ReadFromText | |
class ParseHeadersFn(beam.DoFn): | |
"""ParDo to output only the headers""" | |
def process(self, element): | |
if '--' in element.split(',')[0]: | |
yield [','.join(element.split(',')[1:])] | |
class ParseRowsFn(beam.DoFn): | |
"""ParDo to process data rows according to header metadata""" | |
def process(self, element, headers): | |
if 'time1' in element.split(',')[0]: | |
for id in headers[0]: | |
print 'ids: ' + id | |
for label in headers[1]: | |
print 'labels: ' + label | |
def run(argv=None): | |
parser = argparse.ArgumentParser() | |
known_args, pipeline_args = parser.parse_known_args(argv) | |
p = beam.Pipeline(options=PipelineOptions(pipeline_args)) | |
input = p | 'Read CSV file' >> ReadFromText("input.csv") | |
headers = input | 'Parse headers' >> beam.ParDo(ParseHeadersFn()) | |
rows = input | 'Parse data rows' >> beam.ParDo(ParseRowsFn(), beam.pvalue.AsList(headers)) | |
p.run() | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment