Last active
May 25, 2024 18:45
-
-
Save salvadorgascon/10a3091f6a3f466e355bbcaeed5f773a to your computer and use it in GitHub Desktop.
Load rows into Google Cloud Big Query table object using incremental batch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BIGQUERY_PROJECT_NAME = "project-XXXXXX" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import date | |
from google.cloud import bigquery | |
from google.cloud.exceptions import NotFound | |
from google.api_core.exceptions import Conflict | |
from shared.loaders.bigquery_constants import BIGQUERY_PROJECT_NAME | |
from shared.loaders.bigquery_dataset_exists import BigQueryDatasetExists | |
from shared.loaders.bigquery_create_dataset import BigQueryCreateDataset | |
from shared.loaders.bigquery_create_table import BigQueryCreateTable | |
from shared.loaders.bigquery_table_exists import BigQueryTableExists | |
def BigqueryIncrementalLoader(bigquery_client, bigquery_dataset_name, bigquery_table_name, table_schema, rows): | |
print("Loading data into BigQuery") | |
if not BigQueryDatasetExists(bigquery_client, bigquery_dataset_name): | |
BigQueryCreateDataset(bigquery_client, bigquery_dataset_name) | |
if not BigQueryTableExists(bigquery_client, bigquery_dataset_name, bigquery_table_name): | |
BigQueryCreateTable(bigquery_client, bigquery_dataset_name, bigquery_table_name, table_schema) | |
if len(rows) > 0: | |
print(f'Counting batches in {BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}') | |
query_count_batch = f'SELECT IFNULL(MAX(_batch_sequence)+1, 1) as batch_sequence FROM {BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}' | |
query_count_batch_job = bigquery_client.query(query_count_batch) | |
dataframe_batch_num = query_count_batch_job.result().to_dataframe() | |
batch_num = dataframe_batch_num.iloc[0]['batch_sequence'] | |
batch_date = date.today().strftime("%Y-%m-%d") | |
print("Current batch: ",batch_num, batch_date) | |
for row in rows: | |
row["_batched_at"] = batch_date | |
row["_batch_sequence"] = int(batch_num) | |
MAX_ROWS_REQUEST = 500 | |
if len(rows) > MAX_ROWS_REQUEST: | |
for i in range(0, len(rows), MAX_ROWS_REQUEST): | |
subset = rows[i:i + MAX_ROWS_REQUEST] | |
print(f'Appending records {i}:{i + MAX_ROWS_REQUEST} a {BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}') | |
errors_inserting_rows = bigquery_client.insert_rows_json(f'{BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}', | |
subset) | |
if errors_inserting_rows == []: | |
print("Records added.") | |
else: | |
print("Found errors while appending records: {}".format(errors_inserting_rows)) | |
return False | |
return True | |
else: | |
print(f'Appending records {BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}') | |
errors_inserting_rows = bigquery_client.insert_rows_json(f'{BIGQUERY_PROJECT_NAME}.{bigquery_dataset_name}.{bigquery_table_name}', | |
rows) | |
if errors_inserting_rows == []: | |
print("Records added.") | |
return True | |
else: | |
print("Found errors while appending records: {}".format(errors_inserting_rows)) | |
return False | |
else: | |
return True | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment