Skip to content

Instantly share code, notes, and snippets.

$ bq query 'select first_name, last_name, dob from staging.table_1'
Waiting on bqjob_r2c64623d43d6b68d_0000016de49a1248_1 ... (0s) Current status: DONE
+------------+-----------+------------+
| first_name | last_name | dob |
+------------+-----------+------------+
| John | Doe | 1968-01-22 |
| Peter | Doe | 1968-01-22 |
+------------+-----------+------------+
$ gcloud config set project your-project
$ gsutil mb -c regional -l europe-west2 gs://project_staging_files
$ gsutil cp ./test_files/* gs://project_staging_files/
- name: table_1
size: large
format: NEWLINE_DELIMITED_JSON
columns: []
schema:
- name: "id"
type: "INT64"
mode: "NULLABLE"
- name: "first_name"
type: "STRING"
from event import data
print(data['bucket'])
print(data['name'])
print(data['timeCreated'])
from main import streaming
streaming(data, 'context')
data = {"name": "table-1_data_new_line_delimited_json.json", \
"bucket":"project_staging_files", \
"timeCreated": "2019-09-24 15:54:54"\
}\
$ bq query 'select first_name, last_name, dob from staging.table_1'
Waiting on bqjob_r5f1305f93091f0a5_0000016de8e9171c_1 ... (0s) Current status: DONE
+------------+-----------+------------+
| first_name | last_name | dob |
+------------+-----------+------------+
| John | Doe | 1968-01-22 |
| Peter | Doe | 1968-01-22 |
| John | Doe | 1968-01-22 |
| Peter | Doe | 1968-01-22 |
+------------+-----------+------------+
def _load_table_from_json(bucket_name, file_name, tableSchema, tableName):
blob = CS.get_bucket(bucket_name).blob(file_name)
#! source data file format must be outer array JSON:
body = json.loads(blob.download_as_string())
table_id = BQ.dataset(BQ_DATASET).table(tableName)
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
job_config.write_disposition = 'WRITE_APPEND'
# data = {"name": "table-1_data_new_line_delimited_json.json", \
data = {"name": "table-2_data_json_outer_array.json", \
# data = {"name": "table-3_data_new_line_delimited_src.json", \
# data = {"name": "table-4_data_object_string.json", \
# data = {"name": "table-5_data_new_line_delimited_json.json", \
# data = {"name": "table-6_data_new_line_delimited_json.json", \
"bucket":"project_staging_files", \
"timeCreated": "2019-09-24 15:54:54"\
}\
def _load_table_as_src(bucket_name, file_name, tableSchema, tableName):
# ! source file must be outer array JSON
# ! this will work for CSV where a row is A JSON string --> SRC column (Snowflake like)
blob = CS.get_bucket(bucket_name).blob(file_name)
body = json.loads(blob.download_as_string())
table_id = BQ.dataset(BQ_DATASET).table(tableName)
schema = create_schema_from_yaml(tableSchema)
job_config.schema = schema
def _load_table_from_object_string(bucket_name, file_name, tableSchema, tableName):
# ! we will convert body to a new line delimited JSON
blob = CS.get_bucket(bucket_name).blob(file_name)
blob = blob.download_as_string().decode()
# Transform object string data into JSON outer array string:
blob = json.dumps('[' + blob.replace('}{', '},{') + ']')
# Load as JSON:
body = json.loads(blob)
# Create an array of string elements from JSON:
jsonReady = [json.dumps(record) for record in json.loads(body)]