Skip to content

Instantly share code, notes, and snippets.

@tomfa
Last active October 21, 2021 20:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomfa/9492ac855ad0095a5e595822d5aada75 to your computer and use it in GitHub Desktop.
Save tomfa/9492ac855ad0095a5e595822d5aada75 to your computer and use it in GitHub Desktop.
Demo on how to upload to BigQuery with python3. See https://notes.webutvikling.org/starting-bigquery/
# 1. install dependencies
# 2. Set service account json in code
# or with env var "SA_ACCOUNT"
# 3. Run this file to import test data:
# "python3 bigquery_import.py"
import os
import json
from google.cloud import bigquery
from google.oauth2.service_account import Credentials
import pandas
TABLE_NAME = 'nameexample'
DATASET_NAME = 'example'
def get_service_account():
service_account_json = os.environ.get('SA_CREDENTIALS')
# For local testing, you can comment out the lines below, and use your
# service account json file downloaded from google cloud
#
# service_account_json = json.loads({
# "type": "service_account",
# "project_id": "myexampleproject",
# "private_key_id": "975c1e87f065c76c9915887acc1afedeb59600b0",
# "private_key": "-----BEGIN PRIVATE KEY-----\ndummyexample/12345\==\n-----END PRIVATE KEY-----\n",
# "client_email": "firebase-adminsdk-r1to@myexampleproject.iam.gserviceaccount.com",
# "client_id": "123457110251480388426",
# "auth_uri": "https://accounts.google.com/o/oauth2/auth",
# "token_uri": "https://oauth2.googleapis.com/token",
# "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
# "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-r7tvo%40myexampleproject.iam.gserviceaccount.com"
# })
if not service_account_json:
raise Exception('Missing env var SA_CREDENTIALS')
try:
service_account = json.loads(service_account_json)
except Exception:
print('Could not load SA_CREDENTIALS as json!')
raise
pk = service_account.get('private_key')
service_account['private_key'] = pk.replace('\\n', '\n')
try:
# This validates data
Credentials.from_service_account_info(
service_account
)
return service_account
except Exception:
print('JSON credentials are wrong or malformatted!')
raise
def get_project_id():
sa = get_service_account()
return sa['project_id']
def get_client():
service_account = get_service_account()
google_credentials = Credentials.from_service_account_info(
service_account
)
client = bigquery.Client(credentials=google_credentials)
return client
def _create_dataset(*, project_id: str, dataset_name: str):
client = get_client()
dataset_id = f'{project_id}.{dataset_name}'
dataset = bigquery.Dataset(dataset_id)
dataset.location = "europe-north1"
dataset = client.create_dataset(
dataset,
timeout=30
)
print(f'Created dataset {dataset.project}.{dataset.dataset_id}')
return dataset
def _create_table(
*,
project_id: str,
dataset_name: str,
table_name: str,
schema
):
client = get_client()
table = bigquery.Table(
f"{project_id}.{dataset_name}.{table_name}",
schema=schema,
)
table = client.create_table(table)
print(f'Created table {table.project}.{table.dataset_id}.{table.table_id}')
return table
def _delete_dataset(*, project_id: str, dataset_name: str):
client = get_client()
dataset_id = f'{project_id}.{dataset_name}'
dataset = bigquery.Dataset(dataset_id)
dataset.location = "europe-north1"
client.delete_dataset(
dataset,
timeout=30
)
print(f'Deleted dataset {dataset_id}')
return dataset
def _delete_table(
*,
project_id: str,
dataset_name: str,
table_name: str,
):
client = get_client()
table_id = f"{project_id}.{dataset_name}.{table_name}"
table = client.delete_table(table_id)
print(f'Deleted table {table_id}')
return table
def do_initial_setup():
project_id = get_project_id()
_create_dataset(
project_id=project_id,
dataset_name=DATASET_NAME,
)
_create_table(
project_id=project_id,
dataset_name=DATASET_NAME,
table_name=TABLE_NAME,
schema=bq_schema
)
def send_to_bq(report_records):
client = get_client()
project_id = get_project_id()
dataframe = pandas.DataFrame(
report_records,
# In the loaded table, the column order reflects the order of the
# columns in the DataFrame.
columns=bq_columns,
)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=bq_schema,
# Optionally, set the write disposition. BigQuery appends loaded rows
# to an existing table by default, but with WRITE_TRUNCATE writeF
# disposition it replaces the table with the loaded data.
write_disposition="WRITE_TRUNCATE",
)
job = client.load_table_from_dataframe(
dataframe,
project=project_id,
destination=f'{DATASET_NAME}.{TABLE_NAME}',
job_config=job_config
)
job.result() # Wait for the job to complete.
print(f'Uploaded todays records to {DATASET_NAME}.{TABLE_NAME}')
bq_types = {
'first_name': bigquery.enums.SqlTypeNames.STRING,
'last_name': bigquery.enums.SqlTypeNames.STRING,
}
bq_columns = list(bq_types.keys())
bq_schema = list(map(
lambda item: bigquery.SchemaField(item[0], item[1]),
bq_types.items()
))
if __name__ == "__main__":
print(f'Creating {DATASET_NAME}.{TABLE_NAME}')
try:
get_service_account()
except Exception:
print(f'Could not get service account. Is service account json set?')
raise
try:
do_initial_setup()
except Exception:
print('Could not add table. Assuming it has already been created. Continuing.')
data = [
{
'first_name': 'Tomas',
'last_name': 'Albertsen'
}
]
send_to_bq(data)
# Import these dependencies with
# "pip install -r requirements.txt"
pandas==1.3.2
requests==2.26.0
google-auth==2.0.1
google-cloud-bigquery==2.28.1
pyarrow==5.0.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment