This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ bq query 'select first_name, last_name, dob from staging.table_1' | |
Waiting on bqjob_r2c64623d43d6b68d_0000016de49a1248_1 ... (0s) Current status: DONE | |
+------------+-----------+------------+ | |
| first_name | last_name | dob | | |
+------------+-----------+------------+ | |
| John | Doe | 1968-01-22 | | |
| Peter | Doe | 1968-01-22 | | |
+------------+-----------+------------+ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ gcloud config set project your-project | |
$ gsutil mb -c regional -l europe-west2 gs://project_staging_files | |
$ gsutil cp ./test_files/* gs://project_staging_files/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- name: table_1 | |
size: large | |
format: NEWLINE_DELIMITED_JSON | |
columns: [] | |
schema: | |
- name: "id" | |
type: "INT64" | |
mode: "NULLABLE" | |
- name: "first_name" | |
type: "STRING" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from event import data | |
print(data['bucket']) | |
print(data['name']) | |
print(data['timeCreated']) | |
from main import streaming | |
streaming(data, 'context') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
data = {"name": "table-1_data_new_line_delimited_json.json", \ | |
"bucket":"project_staging_files", \ | |
"timeCreated": "2019-09-24 15:54:54"\ | |
}\ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ bq query 'select first_name, last_name, dob from staging.table_1' | |
Waiting on bqjob_r5f1305f93091f0a5_0000016de8e9171c_1 ... (0s) Current status: DONE | |
+------------+-----------+------------+ | |
| first_name | last_name | dob | | |
+------------+-----------+------------+ | |
| John | Doe | 1968-01-22 | | |
| Peter | Doe | 1968-01-22 | | |
| John | Doe | 1968-01-22 | | |
| Peter | Doe | 1968-01-22 | | |
+------------+-----------+------------+ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_table_from_json(bucket_name, file_name, tableSchema, tableName): | |
blob = CS.get_bucket(bucket_name).blob(file_name) | |
#! source data file format must be outer array JSON: | |
body = json.loads(blob.download_as_string()) | |
table_id = BQ.dataset(BQ_DATASET).table(tableName) | |
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, | |
job_config.write_disposition = 'WRITE_APPEND' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# data = {"name": "table-1_data_new_line_delimited_json.json", \ | |
data = {"name": "table-2_data_json_outer_array.json", \ | |
# data = {"name": "table-3_data_new_line_delimited_src.json", \ | |
# data = {"name": "table-4_data_object_string.json", \ | |
# data = {"name": "table-5_data_new_line_delimited_json.json", \ | |
# data = {"name": "table-6_data_new_line_delimited_json.json", \ | |
"bucket":"project_staging_files", \ | |
"timeCreated": "2019-09-24 15:54:54"\ | |
}\ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_table_as_src(bucket_name, file_name, tableSchema, tableName): | |
# ! source file must be outer array JSON | |
# ! this will work for CSV where a row is A JSON string --> SRC column (Snowflake like) | |
blob = CS.get_bucket(bucket_name).blob(file_name) | |
body = json.loads(blob.download_as_string()) | |
table_id = BQ.dataset(BQ_DATASET).table(tableName) | |
schema = create_schema_from_yaml(tableSchema) | |
job_config.schema = schema |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_table_from_object_string(bucket_name, file_name, tableSchema, tableName): | |
# ! we will convert body to a new line delimited JSON | |
blob = CS.get_bucket(bucket_name).blob(file_name) | |
blob = blob.download_as_string().decode() | |
# Transform object string data into JSON outer array string: | |
blob = json.dumps('[' + blob.replace('}{', '},{') + ']') | |
# Load as JSON: | |
body = json.loads(blob) | |
# Create an array of string elements from JSON: | |
jsonReady = [json.dumps(record) for record in json.loads(body)] |