Skip to content

Instantly share code, notes, and snippets.

@ilanrosenfeld7
Created February 4, 2020 18:07
Show Gist options
  • Save ilanrosenfeld7/9c9508b3eb03fc3f515b10ed1d6dd282 to your computer and use it in GitHub Desktop.
Save ilanrosenfeld7/9c9508b3eb03fc3f515b10ed1d6dd282 to your computer and use it in GitHub Desktop.
def main(event, context):
trigger_bucket = event['bucket']
yaml_filename = event['name']
storage_client = storage.Client()
# get yaml file from GCS trigger bucket which initiated this function
local_yaml = copy_from_gcs(storage_client, trigger_bucket, yaml_filename, tmp_folder)
yaml_dict = parse_yaml(local_yaml)
job_id = yaml_dict.get('job_id')
gcs_data = yaml_dict.get('gcs')
replacement_dict = {
'$PROJECT_ID': yaml_dict.get('project_id'),
'$JOB_ID': job_id,
'$SCHEDULE_INTERVAL': yaml_dict.get('composer').get('schedule_interval'),
'$TARGET_TOPIC': yaml_dict.get('pubsub').get('target_topic'),
'$ORIGIN_BUCKET': gcs_data.get('origin_bucket'),
'$TARGET_BUCKET': gcs_data.get('target_bucket'),
}
# read dag templates from GCS templates bucket
pubsub_dag_template = get_template_from_gcs(storage_client, dag_template_filename, templates_bucket, templates_folder, tmp_folder)
file_movement_dag_template = get_template_from_gcs(storage_client, dag_template_2_filename, templates_bucket, templates_folder, tmp_folder)
# generate unique dags ids based on unique job_id
pubsub_dag_id = '%s-pubsub' % job_id
file_movement_dag_id = '%s-file-movement' % job_id
# generate actual DAGs and copy them to GCS
pubsub_dag_local_path = build_dag_from_template(pubsub_dag_id, file_movement_dag_id,
pubsub_dag_template, replacement_dict, tmp_folder)
write_to_gcs(storage_client, composer_bucket, dags_folder + pubsub_dag_id + ".py", pubsub_dag_local_path)
file_movement_dag_local_path = build_dag_from_template(file_movement_dag_id, None,
file_movement_dag_template, replacement_dict, tmp_folder)
write_to_gcs(storage_client, composer_bucket, dags_folder + file_movement_dag_id + ".py", file_movement_dag_local_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment