Skip to content

Instantly share code, notes, and snippets.

def _load_table_from_json(bucket_name, file_name, tableSchema, tableName):
blob = CS.get_bucket(bucket_name).blob(file_name)
#! source data file format must be outer array JSON:
body = json.loads(blob.download_as_string())
table_id = BQ.dataset(BQ_DATASET).table(tableName)
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
job_config.write_disposition = 'WRITE_APPEND'
# data = {"name": "table-1_data_new_line_delimited_json.json", \
data = {"name": "table-2_data_json_outer_array.json", \
# data = {"name": "table-3_data_new_line_delimited_src.json", \
# data = {"name": "table-4_data_object_string.json", \
# data = {"name": "table-5_data_new_line_delimited_json.json", \
# data = {"name": "table-6_data_new_line_delimited_json.json", \
"bucket":"project_staging_files", \
"timeCreated": "2019-09-24 15:54:54"\
}\
def _load_table_as_src(bucket_name, file_name, tableSchema, tableName):
# ! source file must be outer array JSON
# ! this will work for CSV where a row is A JSON string --> SRC column (Snowflake like)
blob = CS.get_bucket(bucket_name).blob(file_name)
body = json.loads(blob.download_as_string())
table_id = BQ.dataset(BQ_DATASET).table(tableName)
schema = create_schema_from_yaml(tableSchema)
job_config.schema = schema
def _load_table_from_object_string(bucket_name, file_name, tableSchema, tableName):
# ! we will convert body to a new line delimited JSON
blob = CS.get_bucket(bucket_name).blob(file_name)
blob = blob.download_as_string().decode()
# Transform object string data into JSON outer array string:
blob = json.dumps('[' + blob.replace('}{', '},{') + ']')
# Load as JSON:
body = json.loads(blob)
# Create an array of string elements from JSON:
jsonReady = [json.dumps(record) for record in json.loads(body)]
def _load_table_from_dataframe(bucket_name, file_name, tableSchema, tableName):
"""
Source data file must be outer JSON
"""
blob = CS.get_bucket(bucket_name).blob(file_name)
body = json.loads(blob.download_as_string())
table_id = BQ.dataset(BQ_DATASET).table(tableName)
schema = create_schema_from_yaml(tableSchema)
job_config.schema = schema
- name: table_5
size: large
format: DF
columns: []
schema:
- name: "id"
type: "STRING"
mode: "NULLABLE"
- name: "first_name"
type: "STRING"
def _load_table_as_df_normalized(bucket_name, file_name, tableSchema, tableName):
"""
Source data file must be outer JSON
"""
blob = CS.get_bucket(bucket_name).blob(file_name)
body = json.loads(blob.download_as_string())
table_id = BQ.dataset(BQ_DATASET).table(tableName)
schema = create_schema_from_yaml(tableSchema)
job_config.schema = schema
def create_schema_from_yaml(table_schema):
schema = []
for column in table_schema:
schemaField = bigquery.SchemaField(column['name'], column['type'], column['mode'])
schema.append(schemaField)
if column['type'] == 'RECORD':
schemaField._fields = create_schema_from_yaml(column['fields'])
[
SchemaField('id', 'INT64', 'NULLABLE', None, ()),
SchemaField('first_name', 'STRING', 'NULLABLE', None, ()),
SchemaField('last_name', 'STRING', 'NULLABLE', None, ()),
SchemaField('dob', 'DATE', 'NULLABLE', None, ()),
SchemaField('addresses', 'RECORD', 'REPEATED', None, [SchemaField('status', 'STRING', 'NULLABLE', None, ()), SchemaField('address', 'STRING', 'NULLABLE', None, ()), SchemaField('city', 'STRING', 'NULLABLE', None, ()), SchemaField('state', 'STRING', 'NULLABLE', None, ()), SchemaField('zip', 'INT64', 'NULLABLE', None, ()), SchemaField('numberOfYears', 'INT64', 'NULLABLE', None, ())])
]
(base) Mikes-MBP:s3-to-gcs mikeshakhomirov$ node test
index.handler invoked with event { Records:
[ { eventVersion: '2.0',
eventSource: 'aws:s3',
awsRegion: 'eu-west-1',
eventTime: '2017-04-11T09:29:02.255Z',
eventName: 'ObjectCreated:Put',
userIdentity: [Object],
requestParameters: [Object],
responseElements: [Object],