Skip to content

Instantly share code, notes, and snippets.

def main(event, context):
trigger_bucket = event['bucket']
yaml_filename = event['name']
storage_client = storage.Client()
# get yaml file from GCS trigger bucket which initiated this function
local_yaml = copy_from_gcs(storage_client, trigger_bucket, yaml_filename, tmp_folder)
yaml_dict = parse_yaml(local_yaml)
templates_bucket = os.environ.get('templates_bucket')
templates_folder = os.environ.get('templates_folder')
tmp_folder = '/tmp/'
dag_template_filename = os.environ.get('dag_template_filename')
dag_template_2_filename = os.environ.get('dag_template_2_filename')
composer_bucket = os.environ.get('composer_bucket')
dags_folder = 'dags/'
def main(event, context):
project_id: "dynamic_dags_example_project"
job_id: "dynamic_dags_5749db"
composer:
schedule_interval: "*/30 * * * *"
pubsub:
target_topic: "sample_topic"
gcs:
origin_bucket: "source_bucket"
target_bucket: "sink_bucket"
project_id = '$PROJECT_ID'
dag_id = '$DAG_ID'
origin_bucket = '$ORIGIN_BUCKET'
target_bucket = '$TARGET_BUCKET'
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
'start_date': yesterday,
'retries': 1,
now = datetime.datetime.now()
yesterday = now - datetime.timedelta(days=1)
project_id = '$PROJECT_ID'
dag_id='$DAG_ID'
target_topic='$TARGET_TOPIC'
job_id='$JOB_ID'
schedule_interval = '$SCHEDULE_INTERVAL'
next_dag_id = '$NEXT_DAG'
subscription_id = 'subs-%s' % job_id