Skip to content

Instantly share code, notes, and snippets.

@salvadorgascon
Last active May 25, 2024 18:45
Show Gist options
  • Save salvadorgascon/10a3091f6a3f466e355bbcaeed5f773a to your computer and use it in GitHub Desktop.
Save salvadorgascon/10a3091f6a3f466e355bbcaeed5f773a to your computer and use it in GitHub Desktop.
Load rows into Google Cloud Big Query table object using incremental batch
BIGQUERY_PROJECT_NAME = "project-XXXXXX"
from datetime import date
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import Conflict
from shared.loaders.bigquery_constants import BIGQUERY_PROJECT_NAME
from shared.loaders.bigquery_dataset_exists import BigQueryDatasetExists
from shared.loaders.bigquery_create_dataset import BigQueryCreateDataset
from shared.loaders.bigquery_create_table import BigQueryCreateTable
from shared.loaders.bigquery_table_exists import BigQueryTableExists
def BigqueryIncrementalLoader(bigquery_client, bigquery_dataset_name, bigquery_table_name, table_schema, rows):
print("Loading data into BigQuery")
if not BigQueryDatasetExists(bigquery_client, bigquery_dataset_name):
BigQueryCreateDataset(bigquery_client, bigquery_dataset_name)
if not BigQueryTableExists(bigquery_client, bigquery_dataset_name, bigquery_table_name):
BigQueryCreateTable(bigquery_client, bigquery_dataset_name, bigquery_table_name, table_schema)
if len(rows) > 0:
print(f'Counting batches in {BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}')
query_count_batch = f'SELECT IFNULL(MAX(_batch_sequence)+1, 1) as batch_sequence FROM {BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}'
query_count_batch_job = bigquery_client.query(query_count_batch)
dataframe_batch_num = query_count_batch_job.result().to_dataframe()
batch_num = dataframe_batch_num.iloc[0]['batch_sequence']
batch_date = date.today().strftime("%Y-%m-%d")
print("Current batch: ",batch_num, batch_date)
for row in rows:
row["_batched_at"] = batch_date
row["_batch_sequence"] = int(batch_num)
MAX_ROWS_REQUEST = 500
if len(rows) > MAX_ROWS_REQUEST:
for i in range(0, len(rows), MAX_ROWS_REQUEST):
subset = rows[i:i + MAX_ROWS_REQUEST]
print(f'Appending records {i}:{i + MAX_ROWS_REQUEST} a {BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}')
errors_inserting_rows = bigquery_client.insert_rows_json(f'{BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}',
subset)
if errors_inserting_rows == []:
print("Records added.")
else:
print("Found errors while appending records: {}".format(errors_inserting_rows))
return False
return True
else:
print(f'Appending records {BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}')
errors_inserting_rows = bigquery_client.insert_rows_json(f'{BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}',
rows)
if errors_inserting_rows == []:
print("Records added.")
return True
else:
print("Found errors while appending records: {}".format(errors_inserting_rows))
return False
else:
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment