Skip to content

Instantly share code, notes, and snippets.

@ilanrosenfeld7
Last active February 4, 2020 13:48
Show Gist options
  • Save ilanrosenfeld7/3d0b605865aba2c0870d717e0729cfa4 to your computer and use it in GitHub Desktop.
Save ilanrosenfeld7/3d0b605865aba2c0870d717e0729cfa4 to your computer and use it in GitHub Desktop.
templates_bucket = os.environ.get('templates_bucket')
templates_folder = os.environ.get('templates_folder')
tmp_folder = '/tmp/'
dag_template_filename = os.environ.get('dag_template_filename')
dag_template_2_filename = os.environ.get('dag_template_2_filename')
composer_bucket = os.environ.get('composer_bucket')
dags_folder = 'dags/'
def main(event, context):
trigger_bucket = event['bucket']
yaml_filename = event['name']
storage_client = storage.Client()
# get yaml file from GCS trigger bucket which initiated this function
local_yaml = copy_from_gcs(storage_client, trigger_bucket, yaml_filename, tmp_folder)
yaml_dict = parse_yaml(local_yaml)
job_id = yaml_dict.get('job_id')
composer_data = yaml_dict.get('composer')
pubsub_data = yaml_dict.get('pubsub')
gcs_data = yaml_dict.get('gcs')
replacement_dict = {
'$PROJECT_ID': yaml_dict.get('project_id'),
'$JOB_ID': job_id,
'$SCHEDULE_INTERVAL': composer_data.get('schedule_interval'),
'$TARGET_TOPIC': pubsub_data.get('target_topic'),
'$ORIGIN_BUCKET': gcs_data.get('origin_bucket'),
'$TARGET_BUCKET': gcs_data.get('target_bucket'),
}
# read dag templates from GCS templates bucket
pubsub_dag_template = get_template_from_gcs(storage_client, dag_template_filename, templates_bucket, templates_folder, tmp_folder)
file_movement_dag_template = get_template_from_gcs(storage_client, dag_template_2_filename, templates_bucket, templates_folder, tmp_folder)
# generate unique dags ids based on unique job_id
pubsub_dag_id = '%s-pubsub' % job_id
file_movement_dag_id = '%s-file-movement' % job_id
# generate actual DAGs and copy them to GCS
pubsub_dag_local_path = build_dag_from_template(pubsub_dag_id, file_movement_dag_id,
pubsub_dag_template, replacement_dict, tmp_folder)
write_to_gcs(storage_client, composer_bucket, dags_folder + pubsub_dag_id + ".py", pubsub_dag_local_path)
file_movement_dag_local_path = build_dag_from_template(file_movement_dag_id, None,
file_movement_dag_template, replacement_dict, tmp_folder)
write_to_gcs(storage_client, composer_bucket, dags_folder + file_movement_dag_id + ".py", file_movement_dag_local_path)
def parse_yaml(yaml_path):
with open(yaml_path, 'r') as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
raise exc
def copy_from_gcs(storage_client, bucket_name, bucket_filename, tmp_folder):
target_bucket = storage_client.get_bucket(bucket_name)
yaml_blob = target_bucket.blob(bucket_filename)
# Download the file to a local destination
local_file_path = tmp_folder + os.path.basename(bucket_filename)
yaml_blob.download_to_filename(local_file_path)
return local_file_path
def get_template_from_gcs(storage_client, template_filename, templates_bucket, templates_folder, tmp_folder):
return copy_from_gcs(storage_client, templates_bucket,
"%s/%s" % (templates_folder, template_filename), tmp_folder)
def build_dag_from_template(dag_id, next_dag_id, dag_template, replacement_dict, tmp_folder):
replacement_dict['$DAG_ID'] = dag_id
replacement_dict['$NEXT_DAG'] = next_dag_id
resulting_dag = inject_values_in_template(dag_template, replacement_dict)
resulting_dag_filename = dag_id + ".py"
local_dag_path = tmp_folder + resulting_dag_filename
# write resulting dag locally
write_local_file(local_dag_path, resulting_dag)
return local_dag_path
def inject_values_in_template(dag_template, values_dict):
with open(dag_template) as python_file:
dag_template = python_file.read()
for key_to_replace, value in values_dict.items():
dag_template = dag_template.replace(key_to_replace, value if value is not None else '')
return dag_template
def write_local_file(local_path, text_to_write):
with open(local_path, "w+") as f:
f.write(text_to_write)
def write_to_gcs(storage_client, bucket_name, target_path, local_path):
target_bucket = storage_client.get_bucket(bucket_name)
blob = target_bucket.blob(target_path)
blob.upload_from_filename(local_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment