Last active
August 6, 2023 02:50
-
-
Save rajathithan/2e38c96b10b2f43687965c9bfb3434ef to your computer and use it in GitHub Desktop.
Dataflow Job to create custom dataflow template from upload of xml data from GCS to BQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Dataflow Job to create custom dataflow template for upload of xml data from GCS to BQ | |
# Author: Rajathithan Rajasekar | |
# Version : 1.0 | |
# Date: 08/05/2023 | |
import sys | |
import json | |
import logging | |
import argparse | |
import xmltodict | |
import apache_beam as beam | |
from google.cloud import storage | |
from tableschema import table_schema | |
from apache_beam.options.pipeline_options import SetupOptions | |
from apache_beam.options.pipeline_options import PipelineOptions | |
# setup python logging | |
logging.basicConfig(format='[%(levelname)-8s] [%(asctime)s] [%(module)-35s][%(lineno)04d] : %(message)s', level=logging.INFO) | |
logger = logging | |
def convertXMLtoJSON(gcsfile): | |
print("Inside the convertXMLtoJSON function with "+ gcsfile ) | |
storage_client = storage.Client() | |
bucket = storage_client.get_bucket('xmltestpoc') | |
# get bucket data as blob | |
blob = bucket.get_blob(gcsfile) | |
with blob.open("r") as f: | |
data_dict = xmltodict.parse(f.read()) | |
json_data = json.dumps(data_dict) | |
json_without_slash = json.loads(json_data) | |
logger.info(json_without_slash) | |
return json_without_slash | |
def run(): | |
parser = argparse.ArgumentParser(description="Uploading XML data to BigQuery from GCS") | |
parser.add_argument('--project', | |
dest='project', | |
required=False, | |
default='mercurial-smile-386805', | |
help='Project name') | |
parser.add_argument('--input', | |
type=str, | |
default='xml/test1xml.xml', | |
help='Input GCS Folder Path') | |
parser.add_argument('--output', | |
type=str, | |
required=False, | |
default='xmldata.table0001', | |
help='BQ-Dataset.BQ-table') | |
parser.add_argument('--region', | |
dest='region', | |
required=False, | |
default='us-central1', | |
help='Region') | |
parser.add_argument('--staging_location', | |
dest='staging_location', | |
required=False, | |
default='gs://xmltestpoc/staging', | |
help='staging gcs location') | |
parser.add_argument('--temp_location', | |
dest='temp_location', | |
required=False, | |
default='gs://xmltestpoc/temp', | |
help='temp gcs location') | |
parser.add_argument('--jobname', | |
dest='job_name', | |
required=False, | |
default='gcstobqjob', | |
help='jobName') | |
parser.add_argument('--runner', | |
dest='runner', | |
required=False, | |
default='DataflowRunner', | |
help='Runner Name') | |
parser.add_argument('--subnetwork', | |
dest='subnetwork', | |
required=False, | |
default='https://www.googleapis.com/compute/v1/projects/mercurial-smile-386805/regions/us-central1/subnetworks/dataflow-network', | |
help='Subnetwork Name') | |
parser.add_argument('--experiments', | |
default="use_runner_v2", | |
required=False, | |
help='For Batch python pipeline') | |
parser.add_argument('--sdk_container_image', | |
default="us-central1-docker.pkg.dev/mercurial-smile-386805/docker-worker/dataflowworker:v1", | |
required=False, | |
help='Dataflow worker container image') | |
parser.add_argument('--sdk_location', | |
default="container", | |
required=False, | |
help='Dataflow worker container image') | |
parser.add_argument('--template_location', | |
default="gs://xmltestpoc/template/gcstobq.json", | |
required=False, | |
help='Dataflow worker container image') | |
known_args, _ = parser.parse_known_args(sys.argv) | |
pipeline_options = { | |
'input' : known_args.input, | |
'output': known_args.output, | |
'project': known_args.project, | |
'staging_location': known_args.staging_location, | |
'runner': known_args.runner, | |
'region': known_args.region, | |
'temp_location': known_args.temp_location, | |
'job_name': known_args.job_name, | |
'subnetwork': known_args.subnetwork, | |
'experiments':known_args.experiments, | |
'sdk_container_image': known_args.sdk_container_image, | |
'sdk_location': known_args.sdk_location, | |
'template_location': known_args.template_location | |
} | |
options = PipelineOptions.from_dictionary(pipeline_options) | |
options.view_as(SetupOptions).save_main_session = True | |
with beam.Pipeline(options=options) as p: | |
# Convert XML to JSON and Push it to BQ | |
logger.info("Converting XML data to JSON and Pushing it to BQ") | |
uploadtoBQ = (p | |
| 'getfilename' >> beam.Create([known_args.input]) | |
| 'parse' >> beam.Map(convertXMLtoJSON) | |
| 'tobq' >> beam.io.WriteToBigQuery(known_args.output, | |
schema=table_schema, | |
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, | |
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) | |
) | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment