Created
December 9, 2016 13:42
-
-
Save cnicodeme/8ae43e1264564b627f697f2510648674 to your computer and use it in GitHub Desktop.
My first attempt at processing an Xlsx file on Google DataFlow service.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging, argparse | |
import apache_beam as beam | |
from apache_beam.io import gcsio | |
from apache_beam.utils.options import PipelineOptions | |
from openpyxl import load_workbook | |
# @See https://cloud.google.com/dataflow/model/custom-io-python#ptransform-wrappers | |
class FileReader(): | |
"""A file reader implementation""" | |
def __init__(self, path, *args, **kwargs): | |
self.path = path | |
def reader(self): | |
return XlsxFileReader(self.path) | |
class XlsxFileReader(): | |
"""The Xlsx file reader""" | |
def __init__(self, path): | |
self.path = path | |
def _clean_value(self, value): | |
if value is None: | |
return None | |
value = unicode(value) | |
try: | |
value = value.encode('utf-8') | |
except UnicodeEncodeError: | |
pass | |
return value | |
def __iter__(self): | |
wb = load_workbook(filename=self.file, read_only=True) | |
sheet_name = wb.get_sheet_names()[0] | |
ws = wb[sheet_name] | |
for line, row in enumerate(ws.rows): | |
for cell in row: | |
cell_value = self._clean_value(cell.value) | |
if cell_value is not None and cell_value.find('@') > 0: | |
yield cell_value, line | |
break | |
def __enter__(self): | |
self.file = gcsio.GcsIO().open(self.path, 'r') | |
return self | |
def __exit__(self, *args, **kwargs): | |
self.file.close() | |
class ComputeWordLengthFn(beam.DoFn): | |
def process(self, context): | |
yield 'Hello {0}!'.format(context.element[0]) | |
def run(argv=None): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'--input', | |
dest='input', | |
default='gs://norbert-verify-staging/growthlist.xlsx', | |
help='Input file to process.' | |
) | |
parser.add_argument( | |
'--output', | |
dest='output', | |
required=True, | |
help='Output file to write results to.' | |
) | |
known_args, pipeline_args = parser.parse_known_args(argv) | |
pipeline_options = PipelineOptions(pipeline_args) | |
p = beam.Pipeline(options=pipeline_options) | |
p | 'read' >> beam.io.Read(FileReader(known_args.input)) \ | |
| 'verify' >> beam.ParDo(ComputeWordLengthFn()) \ | |
| 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) | |
p.run() | |
if __name__ == '__main__': | |
logging.getLogger().setLevel(logging.INFO) | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment