Skip to content

Instantly share code, notes, and snippets.

@jitoquinto
Created November 9, 2022 19:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jitoquinto/883f95ab6b3aa2f19a224bb1c9089e4d to your computer and use it in GitHub Desktop.
Save jitoquinto/883f95ab6b3aa2f19a224bb1c9089e4d to your computer and use it in GitHub Desktop.
Example Google Workflow that loads data from S3 to BigQuery
# Arguments
# - dest_project_id (String)
# Project ID of Destination Table
# - dest_dataset_id (String)
# Dataset ID of Destination Table
# - dest_table_id (String)
# Table Name of Destination Table
# - source_uri (String)
# Full Cloud Storage Path of Source File(s) (ex. gs://path/to/file.txt.gz or ["gs://path/to/file.txt.gz", etc..])
# - transfer_job_name (String)
# Full Name of transfer job that syncs S3 to GCS bucket (ex. transferJobs/123456789)
main:
params: [args]
steps:
- init:
assign:
- dest_project_id: ${args.dest_project_id}
- dest_dataset_id: ${args.dest_dataset_id}
- dest_table_id: ${args.dest_table_id}
- source_uri: ${args.source_uri}
- transfer_job_name: ${args.transfer_job_name}
- run_s3_to_gcs_transfer:
try:
call: googleapis.storagetransfer.v1.transferJobs.run
args:
jobName: ${transfer_job_name}
body:
projectId: ${dest_project_id}
result: transfer_result
except:
as: error
steps:
- log_transfer_job_error:
call: sys.log
args:
severity: "ERROR"
json:
workflow_execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
source_uri: ${source_uri}
transfer_job_name: ${transfer_job_name}
target_destination:
projectId: ${dest_project_id}
datasetId: ${dest_dataset_id}
tableId: ${dest_table_id}
sync_status: "FAILED"
error_message: ${error}
- raise_error_to_halt_execution:
raise: error
- load_data_into_bq_table:
try:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${dest_project_id}
body:
configuration:
load:
createDisposition: CREATE_IF_NEEDED
writeDisposition: WRITE_TRUNCATE
sourceUris: ${source_uri}
destinationTable:
projectId: ${dest_project_id}
datasetId: ${dest_dataset_id}
tableId: ${dest_table_id}
skipLeadingRows: 1
result: load_result
except:
as: error
steps:
- log_bq_load_error:
call: sys.log
args:
severity: "ERROR"
json:
workflow_execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
source_uri: ${source_uri}
target_destination:
projectId: ${dest_project_id}
datasetId: ${dest_dataset_id}
tableId: ${dest_table_id}
sync_status: "FAILED"
error_message: ${error}
- raise_error_to_halt_execution:
raise: error
- log_success:
call: sys.log
args:
severity: "INFO"
json:
workflow_execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
source_uri: ${source_uri}
target_destination:
projectId: ${dest_project_id}
datasetId: ${dest_dataset_id}
tableId: ${dest_table_id}
sync_status: "SUCCEEDED"
result: ${load_result}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment