Last active
February 4, 2020 13:48
-
-
Save ilanrosenfeld7/3d0b605865aba2c0870d717e0729cfa4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
templates_bucket = os.environ.get('templates_bucket') | |
templates_folder = os.environ.get('templates_folder') | |
tmp_folder = '/tmp/' | |
dag_template_filename = os.environ.get('dag_template_filename') | |
dag_template_2_filename = os.environ.get('dag_template_2_filename') | |
composer_bucket = os.environ.get('composer_bucket') | |
dags_folder = 'dags/' | |
def main(event, context): | |
trigger_bucket = event['bucket'] | |
yaml_filename = event['name'] | |
storage_client = storage.Client() | |
# get yaml file from GCS trigger bucket which initiated this function | |
local_yaml = copy_from_gcs(storage_client, trigger_bucket, yaml_filename, tmp_folder) | |
yaml_dict = parse_yaml(local_yaml) | |
job_id = yaml_dict.get('job_id') | |
composer_data = yaml_dict.get('composer') | |
pubsub_data = yaml_dict.get('pubsub') | |
gcs_data = yaml_dict.get('gcs') | |
replacement_dict = { | |
'$PROJECT_ID': yaml_dict.get('project_id'), | |
'$JOB_ID': job_id, | |
'$SCHEDULE_INTERVAL': composer_data.get('schedule_interval'), | |
'$TARGET_TOPIC': pubsub_data.get('target_topic'), | |
'$ORIGIN_BUCKET': gcs_data.get('origin_bucket'), | |
'$TARGET_BUCKET': gcs_data.get('target_bucket'), | |
} | |
# read dag templates from GCS templates bucket | |
pubsub_dag_template = get_template_from_gcs(storage_client, dag_template_filename, templates_bucket, templates_folder, tmp_folder) | |
file_movement_dag_template = get_template_from_gcs(storage_client, dag_template_2_filename, templates_bucket, templates_folder, tmp_folder) | |
# generate unique dags ids based on unique job_id | |
pubsub_dag_id = '%s-pubsub' % job_id | |
file_movement_dag_id = '%s-file-movement' % job_id | |
# generate actual DAGs and copy them to GCS | |
pubsub_dag_local_path = build_dag_from_template(pubsub_dag_id, file_movement_dag_id, | |
pubsub_dag_template, replacement_dict, tmp_folder) | |
write_to_gcs(storage_client, composer_bucket, dags_folder + pubsub_dag_id + ".py", pubsub_dag_local_path) | |
file_movement_dag_local_path = build_dag_from_template(file_movement_dag_id, None, | |
file_movement_dag_template, replacement_dict, tmp_folder) | |
write_to_gcs(storage_client, composer_bucket, dags_folder + file_movement_dag_id + ".py", file_movement_dag_local_path) | |
def parse_yaml(yaml_path): | |
with open(yaml_path, 'r') as stream: | |
try: | |
return yaml.safe_load(stream) | |
except yaml.YAMLError as exc: | |
print(exc) | |
raise exc | |
def copy_from_gcs(storage_client, bucket_name, bucket_filename, tmp_folder): | |
target_bucket = storage_client.get_bucket(bucket_name) | |
yaml_blob = target_bucket.blob(bucket_filename) | |
# Download the file to a local destination | |
local_file_path = tmp_folder + os.path.basename(bucket_filename) | |
yaml_blob.download_to_filename(local_file_path) | |
return local_file_path | |
def get_template_from_gcs(storage_client, template_filename, templates_bucket, templates_folder, tmp_folder): | |
return copy_from_gcs(storage_client, templates_bucket, | |
"%s/%s" % (templates_folder, template_filename), tmp_folder) | |
def build_dag_from_template(dag_id, next_dag_id, dag_template, replacement_dict, tmp_folder): | |
replacement_dict['$DAG_ID'] = dag_id | |
replacement_dict['$NEXT_DAG'] = next_dag_id | |
resulting_dag = inject_values_in_template(dag_template, replacement_dict) | |
resulting_dag_filename = dag_id + ".py" | |
local_dag_path = tmp_folder + resulting_dag_filename | |
# write resulting dag locally | |
write_local_file(local_dag_path, resulting_dag) | |
return local_dag_path | |
def inject_values_in_template(dag_template, values_dict): | |
with open(dag_template) as python_file: | |
dag_template = python_file.read() | |
for key_to_replace, value in values_dict.items(): | |
dag_template = dag_template.replace(key_to_replace, value if value is not None else '') | |
return dag_template | |
def write_local_file(local_path, text_to_write): | |
with open(local_path, "w+") as f: | |
f.write(text_to_write) | |
def write_to_gcs(storage_client, bucket_name, target_path, local_path): | |
target_bucket = storage_client.get_bucket(bucket_name) | |
blob = target_bucket.blob(target_path) | |
blob.upload_from_filename(local_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment