Skip to content

Instantly share code, notes, and snippets.

@frankShih
Last active May 25, 2022 13:18
Show Gist options
  • Save frankShih/065aaff7a7b0b296a41ee7a4ab4bb706 to your computer and use it in GitHub Desktop.
Save frankShih/065aaff7a7b0b296a41ee7a4ab4bb706 to your computer and use it in GitHub Desktop.
using GCP dataflow to read text file as JSON & split it into files
#!/usr/bin/env python3
from __future__ import absolute_import
import argparse
import json
# from google.cloud import storage
from datetime import datetime
import configparser
import apache_beam as beam
from apache_beam.coders import coders
from apache_beam.io import ReadFromText, filebasedsink, WriteToText
from apache_beam.io.iobase import Write
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.transforms import PTransform
facility_set = ('HK', 'LK', 'SZXN') # , 'SZ_FB', 'SZ_FT'
class _JsonSink(filebasedsink.FileBasedSink):
"""A Dataflow sink for writing JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=CompressionTypes.AUTO):
super(_JsonSink, self).__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
mime_type='text/plain',
compression_type=compression_type)
self.last_rows = dict()
def open(self, temp_path):
""" Open file and initialize it w opening a list."""
file_handle = super(_JsonSink, self).open(temp_path)
file_handle.write(b'[')
return file_handle
def write_record(self, file_handle, value):
"""Writes a single encoded record converted to JSON and terminates the
line w a comma."""
if self.last_rows.get(file_handle, None) is not None:
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
file_handle.write(b',')
self.last_rows[file_handle] = value
def close(self, file_handle):
"""Finalize the JSON list and close the file handle returned from
``open()``. Called after all records are written.
"""
if file_handle is not None:
# Write last row without a comma
if file_handle in self.last_rows:
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
# Close list and then the file
file_handle.write(b']')
file_handle.close()
class WriteToJson(PTransform):
"""PTransform for writing to JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=CompressionTypes.AUTO):
self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards,
shard_name_template, coder, compression_type)
def expand(self, pcoll):
return pcoll | Write(self._sink)
class str2JsonDoFn(beam.DoFn):
"""Parse input text into dict."""
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
# print(type(element), element)
return json.loads(element)
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input",
dest="input",
default='gs://wh_dev/WMS_SKU.json',
# required=True,
help='Input file to process.')
parser.add_argument("--output",
dest="output",
# CHANGE 1/5: The Google Cloud Storage path is required
# for outputting the results.
default='gs://wh_dev/output',
# required=True,
help='Output file to write results to.')
app_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--job_name=log-partition',
# '--save_main_session',
# CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
# run your pipeline on the Google Cloud Dataflow Service.
'--setup_file=./setup.py',
# '--requirements_file=./requirements.txt',
'--runner=DataflowRunner',
# CHANGE 3/5: Your project ID is required in order to run your pipeline on
# the Google Cloud Dataflow Service.
'--project=dsu-dev',
# CHANGE 4/5: Your Google Cloud Storage path is required for staging local
# files.
'--staging_location=gs://wh_backup_dev/tmp',
# CHANGE 5/5: Your Google Cloud Storage path is required for temporary
# files.
'--temp_location=gs://wh_backup_dev/tmp',
])
pipe_options = PipelineOptions(pipeline_args)
pipe_options.view_as(SetupOptions).save_main_session = True
print(('app_args: ', app_args))
print(('pipeline_args: ', pipeline_args))
with beam.Pipeline(options=pipe_options) as p:
data = p | "LOAD" >> beam.io.ReadFromText(app_args.input)
data = data | 'to_dict' >> beam.ParDo(str2JsonDoFn())
for fac in facility_set:
# !!!!! using 'currying function' to avoid 'lazy evaluation' !!!!!
splited = data | ("FILTER_%s" % fac) >> beam.Filter(
(lambda y: lambda x: x['FACILITY'] == y)(fac))
print('write file: %s_%s ' % (app_args.output, fac))
splited | ("WRITE_%s" % fac) >> WriteToJson('%s_%s' % (app_args.output, fac),
file_name_suffix='.json',
shard_name_template='')
result = p.run().wait_until_finish()
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment