Skip to content

Instantly share code, notes, and snippets.

@mshakhomirov
Created October 20, 2019 17:57
Show Gist options
  • Save mshakhomirov/ee0870ff4822e551cb64614f318e23ac to your computer and use it in GitHub Desktop.
Save mshakhomirov/ee0870ff4822e551cb64614f318e23ac to your computer and use it in GitHub Desktop.
def _load_table_from_object_string(bucket_name, file_name, tableSchema, tableName):
# ! we will convert body to a new line delimited JSON
blob = CS.get_bucket(bucket_name).blob(file_name)
blob = blob.download_as_string().decode()
# Transform object string data into JSON outer array string:
blob = json.dumps('[' + blob.replace('}{', '},{') + ']')
# Load as JSON:
body = json.loads(blob)
# Create an array of string elements from JSON:
jsonReady = [json.dumps(record) for record in json.loads(body)]
# Now join them to create new line delimited JSON:
data_str = u"\n".join(jsonReady)
print('data_file :', data_str)
# Create file to load into BigQuery:
data_file = StringIO(data_str)
table_id = BQ.dataset(BQ_DATASET).table(tableName)
schema = create_schema_from_yaml(tableSchema)
job_config.schema = schema
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
job_config.write_disposition = 'WRITE_APPEND',
load_job = BQ.load_table_from_file(
data_file,
table_id,
job_config=job_config,
)
load_job.result() # Waits for table load to complete.
print("Job finished.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment