Skip to content

Instantly share code, notes, and snippets.

@Dminor7
Last active August 21, 2023 07:07
Show Gist options
  • Save Dminor7/8ff3fa034e991ac2839d66131fc004c5 to your computer and use it in GitHub Desktop.
Save Dminor7/8ff3fa034e991ac2839d66131fc004c5 to your computer and use it in GitHub Desktop.
Functions for moving data between DataFrame and Google Cloud Storage/BigQuery using Arrow and GCS libraries.
"""
Check this stackeroverflow question: https://stackoverflow.com/questions/68303327/unnecessary-list-item-nesting-in-bigquery-schemas-from-pyarrow-upload-dataframe
Check this github issue: https://github.com/googleapis/python-bigquery/issues/19
"""
from google.cloud.bigquery._pandas_helpers import *
from google.cloud.bigquery import _helpers
from google.cloud import storage
from google.cloud import bigquery
def upload_to_gcs(df, bq_schema, bucket, key, service_account_json):
pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)
import pyarrow.parquet
import pyarrow
bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(df, bq_schema)
writer = pyarrow.BufferOutputStream()
pyarrow.parquet.write_table(arrow_table, writer,use_compliant_nested_type=True)
body = bytes(writer.getvalue())
client = storage.Client.from_service_account_json(service_account_json)
bucket = storage.Bucket(client, bucket)
blob = bucket.blob(key)
blob.upload_from_string(body)
def gcs_to_bq(table_id, bq_schema, uri, service_account_json, write_disposition):
client = bigquery.Client.from_service_account_json(service_account_json)
table = bigquery.Table(table_id, schema=bq_schema)
client.create_table(table, exists_ok=True)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.write_disposition = write_disposition
parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True
job_config.parquet_options = parquet_options
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
)
load_job.result()
def gcs_to_bq_external_table(table_id, bq_schema, uri, service_account_info):
client = bigquery.Client.from_service_account_info(service_account_info)
table = bigquery.Table(table_id, schema=bq_schema)
parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True
external_config = bigquery.ExternalConfig("PARQUET")
external_config.source_uris = [uri]
external_config.parquet_options=parquet_options
table.external_data_configuration = external_config
table = client.create_table(table,exists_ok=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment