Skip to content

Instantly share code, notes, and snippets.

@muazamkamal
Last active May 27, 2022 16:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save muazamkamal/0a64e28e54ac6acc268d79827c64ef71 to your computer and use it in GitHub Desktop.
Save muazamkamal/0a64e28e54ac6acc268d79827c64ef71 to your computer and use it in GitHub Desktop.
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def run(pipeline_args, known_args):
"""
Invoked by the Beam runner
"""
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from geobeam.io import ShapefileSource
from geobeam.fn import format_record, make_valid, filter_invalid
from geobeam.util import get_bigquery_schema_dataflow
pipeline_options = PipelineOptions([
'--experiments', 'use_beam_bq_sink',
] + pipeline_args)
# Get the schema from shapefile
table_schema = parse_table_schema_from_json(get_bigquery_schema_dataflow(known_args.gcs_url))
with beam.Pipeline(options=pipeline_options) as p:
(p
| beam.io.Read(ShapefileSource(known_args.gcs_url,
layer_name=known_args.layer_name))
| 'MakeValid' >> beam.Map(make_valid)
| 'FilterInvalid' >> beam.Filter(filter_invalid)
| 'FormatRecords' >> beam.Map(format_record)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
beam_bigquery.TableReference(
datasetId=known_args.dataset,
tableId=known_args.table),
schema=table_schema, # Pass the schema generated earlier
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)) # Create table is does not exist
if __name__ == '__main__':
import logging
import argparse
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--gcs_url')
parser.add_argument('--dataset')
parser.add_argument('--table')
parser.add_argument('--layer_name')
parser.add_argument('--in_epsg', type=int, default=None)
known_args, pipeline_args = parser.parse_known_args()
run(pipeline_args, known_args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment