This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_table_from_json(bucket_name, file_name, tableSchema, tableName): | |
blob = CS.get_bucket(bucket_name).blob(file_name) | |
#! source data file format must be outer array JSON: | |
body = json.loads(blob.download_as_string()) | |
table_id = BQ.dataset(BQ_DATASET).table(tableName) | |
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, | |
job_config.write_disposition = 'WRITE_APPEND' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# data = {"name": "table-1_data_new_line_delimited_json.json", \ | |
data = {"name": "table-2_data_json_outer_array.json", \ | |
# data = {"name": "table-3_data_new_line_delimited_src.json", \ | |
# data = {"name": "table-4_data_object_string.json", \ | |
# data = {"name": "table-5_data_new_line_delimited_json.json", \ | |
# data = {"name": "table-6_data_new_line_delimited_json.json", \ | |
"bucket":"project_staging_files", \ | |
"timeCreated": "2019-09-24 15:54:54"\ | |
}\ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_table_as_src(bucket_name, file_name, tableSchema, tableName): | |
# ! source file must be outer array JSON | |
# ! this will work for CSV where a row is A JSON string --> SRC column (Snowflake like) | |
blob = CS.get_bucket(bucket_name).blob(file_name) | |
body = json.loads(blob.download_as_string()) | |
table_id = BQ.dataset(BQ_DATASET).table(tableName) | |
schema = create_schema_from_yaml(tableSchema) | |
job_config.schema = schema |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_table_from_object_string(bucket_name, file_name, tableSchema, tableName): | |
# ! we will convert body to a new line delimited JSON | |
blob = CS.get_bucket(bucket_name).blob(file_name) | |
blob = blob.download_as_string().decode() | |
# Transform object string data into JSON outer array string: | |
blob = json.dumps('[' + blob.replace('}{', '},{') + ']') | |
# Load as JSON: | |
body = json.loads(blob) | |
# Create an array of string elements from JSON: | |
jsonReady = [json.dumps(record) for record in json.loads(body)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_table_from_dataframe(bucket_name, file_name, tableSchema, tableName): | |
""" | |
Source data file must be outer JSON | |
""" | |
blob = CS.get_bucket(bucket_name).blob(file_name) | |
body = json.loads(blob.download_as_string()) | |
table_id = BQ.dataset(BQ_DATASET).table(tableName) | |
schema = create_schema_from_yaml(tableSchema) | |
job_config.schema = schema |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- name: table_5 | |
size: large | |
format: DF | |
columns: [] | |
schema: | |
- name: "id" | |
type: "STRING" | |
mode: "NULLABLE" | |
- name: "first_name" | |
type: "STRING" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_table_as_df_normalized(bucket_name, file_name, tableSchema, tableName): | |
""" | |
Source data file must be outer JSON | |
""" | |
blob = CS.get_bucket(bucket_name).blob(file_name) | |
body = json.loads(blob.download_as_string()) | |
table_id = BQ.dataset(BQ_DATASET).table(tableName) | |
schema = create_schema_from_yaml(tableSchema) | |
job_config.schema = schema |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def create_schema_from_yaml(table_schema): | |
schema = [] | |
for column in table_schema: | |
schemaField = bigquery.SchemaField(column['name'], column['type'], column['mode']) | |
schema.append(schemaField) | |
if column['type'] == 'RECORD': | |
schemaField._fields = create_schema_from_yaml(column['fields']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ | |
SchemaField('id', 'INT64', 'NULLABLE', None, ()), | |
SchemaField('first_name', 'STRING', 'NULLABLE', None, ()), | |
SchemaField('last_name', 'STRING', 'NULLABLE', None, ()), | |
SchemaField('dob', 'DATE', 'NULLABLE', None, ()), | |
SchemaField('addresses', 'RECORD', 'REPEATED', None, [SchemaField('status', 'STRING', 'NULLABLE', None, ()), SchemaField('address', 'STRING', 'NULLABLE', None, ()), SchemaField('city', 'STRING', 'NULLABLE', None, ()), SchemaField('state', 'STRING', 'NULLABLE', None, ()), SchemaField('zip', 'INT64', 'NULLABLE', None, ()), SchemaField('numberOfYears', 'INT64', 'NULLABLE', None, ())]) | |
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(base) Mikes-MBP:s3-to-gcs mikeshakhomirov$ node test | |
index.handler invoked with event { Records: | |
[ { eventVersion: '2.0', | |
eventSource: 'aws:s3', | |
awsRegion: 'eu-west-1', | |
eventTime: '2017-04-11T09:29:02.255Z', | |
eventName: 'ObjectCreated:Put', | |
userIdentity: [Object], | |
requestParameters: [Object], | |
responseElements: [Object], |