Last active
May 25, 2022 13:18
-
-
Save frankShih/065aaff7a7b0b296a41ee7a4ab4bb706 to your computer and use it in GitHub Desktop.
using GCP dataflow to read text file as JSON & split it into files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from __future__ import absolute_import | |
import argparse | |
import json | |
# from google.cloud import storage | |
from datetime import datetime | |
import configparser | |
import apache_beam as beam | |
from apache_beam.coders import coders | |
from apache_beam.io import ReadFromText, filebasedsink, WriteToText | |
from apache_beam.io.iobase import Write | |
from apache_beam.io.filesystem import CompressionTypes | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from apache_beam.options.pipeline_options import SetupOptions | |
from apache_beam.transforms import PTransform | |
facility_set = ('HK', 'LK', 'SZXN') # , 'SZ_FB', 'SZ_FT' | |
class _JsonSink(filebasedsink.FileBasedSink): | |
"""A Dataflow sink for writing JSON files.""" | |
def __init__(self, | |
file_path_prefix, | |
file_name_suffix='', | |
num_shards=0, | |
shard_name_template=None, | |
coder=coders.ToStringCoder(), | |
compression_type=CompressionTypes.AUTO): | |
super(_JsonSink, self).__init__( | |
file_path_prefix, | |
file_name_suffix=file_name_suffix, | |
num_shards=num_shards, | |
shard_name_template=shard_name_template, | |
coder=coder, | |
mime_type='text/plain', | |
compression_type=compression_type) | |
self.last_rows = dict() | |
def open(self, temp_path): | |
""" Open file and initialize it w opening a list.""" | |
file_handle = super(_JsonSink, self).open(temp_path) | |
file_handle.write(b'[') | |
return file_handle | |
def write_record(self, file_handle, value): | |
"""Writes a single encoded record converted to JSON and terminates the | |
line w a comma.""" | |
if self.last_rows.get(file_handle, None) is not None: | |
file_handle.write(self.coder.encode( | |
json.dumps(self.last_rows[file_handle]))) | |
file_handle.write(b',') | |
self.last_rows[file_handle] = value | |
def close(self, file_handle): | |
"""Finalize the JSON list and close the file handle returned from | |
``open()``. Called after all records are written. | |
""" | |
if file_handle is not None: | |
# Write last row without a comma | |
if file_handle in self.last_rows: | |
file_handle.write(self.coder.encode( | |
json.dumps(self.last_rows[file_handle]))) | |
# Close list and then the file | |
file_handle.write(b']') | |
file_handle.close() | |
class WriteToJson(PTransform): | |
"""PTransform for writing to JSON files.""" | |
def __init__(self, | |
file_path_prefix, | |
file_name_suffix='', | |
num_shards=0, | |
shard_name_template=None, | |
coder=coders.ToStringCoder(), | |
compression_type=CompressionTypes.AUTO): | |
self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards, | |
shard_name_template, coder, compression_type) | |
def expand(self, pcoll): | |
return pcoll | Write(self._sink) | |
class str2JsonDoFn(beam.DoFn): | |
"""Parse input text into dict.""" | |
def process(self, element): | |
"""Returns an iterator over the words of this element. | |
The element is a line of text. If the line is blank, note that, too. | |
Args: | |
element: the element being processed | |
Returns: | |
The processed element. | |
""" | |
# print(type(element), element) | |
return json.loads(element) | |
def run(argv=None): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--input", | |
dest="input", | |
default='gs://wh_dev/WMS_SKU.json', | |
# required=True, | |
help='Input file to process.') | |
parser.add_argument("--output", | |
dest="output", | |
# CHANGE 1/5: The Google Cloud Storage path is required | |
# for outputting the results. | |
default='gs://wh_dev/output', | |
# required=True, | |
help='Output file to write results to.') | |
app_args, pipeline_args = parser.parse_known_args(argv) | |
pipeline_args.extend([ | |
'--job_name=log-partition', | |
# '--save_main_session', | |
# CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to | |
# run your pipeline on the Google Cloud Dataflow Service. | |
'--setup_file=./setup.py', | |
# '--requirements_file=./requirements.txt', | |
'--runner=DataflowRunner', | |
# CHANGE 3/5: Your project ID is required in order to run your pipeline on | |
# the Google Cloud Dataflow Service. | |
'--project=dsu-dev', | |
# CHANGE 4/5: Your Google Cloud Storage path is required for staging local | |
# files. | |
'--staging_location=gs://wh_backup_dev/tmp', | |
# CHANGE 5/5: Your Google Cloud Storage path is required for temporary | |
# files. | |
'--temp_location=gs://wh_backup_dev/tmp', | |
]) | |
pipe_options = PipelineOptions(pipeline_args) | |
pipe_options.view_as(SetupOptions).save_main_session = True | |
print(('app_args: ', app_args)) | |
print(('pipeline_args: ', pipeline_args)) | |
with beam.Pipeline(options=pipe_options) as p: | |
data = p | "LOAD" >> beam.io.ReadFromText(app_args.input) | |
data = data | 'to_dict' >> beam.ParDo(str2JsonDoFn()) | |
for fac in facility_set: | |
# !!!!! using 'currying function' to avoid 'lazy evaluation' !!!!! | |
splited = data | ("FILTER_%s" % fac) >> beam.Filter( | |
(lambda y: lambda x: x['FACILITY'] == y)(fac)) | |
print('write file: %s_%s ' % (app_args.output, fac)) | |
splited | ("WRITE_%s" % fac) >> WriteToJson('%s_%s' % (app_args.output, fac), | |
file_name_suffix='.json', | |
shard_name_template='') | |
result = p.run().wait_until_finish() | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment