Skip to content

Instantly share code, notes, and snippets.

@ilanrosenfeld7
Last active February 4, 2020 18:20
Show Gist options
  • Save ilanrosenfeld7/3c3785b5e6dccd33254676bdfc7087fd to your computer and use it in GitHub Desktop.
Save ilanrosenfeld7/3c3785b5e6dccd33254676bdfc7087fd to your computer and use it in GitHub Desktop.
now = datetime.datetime.now()
yesterday = now - datetime.timedelta(days=1)
project_id = '$PROJECT_ID'
dag_id='$DAG_ID'
target_topic='$TARGET_TOPIC'
job_id='$JOB_ID'
schedule_interval = '$SCHEDULE_INTERVAL'
next_dag_id = '$NEXT_DAG'
subscription_id = 'subs-%s' % job_id
default_dag_args = {
'start_date': yesterday,
'retries': 1,
'retry_delay': datetime.timedelta(seconds=5),
'project_id': project_id,
}
sample_dag = models.DAG(
dag_id,
description='Sample DAG',
catchup=False,
default_args=default_dag_args,
schedule_interval=schedule_interval,
max_active_runs=1)
create_subscription_to_pubsub = PubSubSubscriptionCreateOperator(
task_id='create_pubsub_subscription',
topic_project=project_id,
topic=target_topic,
subscription=subscription_id,
dag=sample_dag
)
wait_on_pubsub = PubSubPullSensor(
task_id='pull_pubsub_message',
subscription=subscription_id,
project=project_id,
ack_messages=True,
execution_timeout=timedelta(seconds=5),
email_on_failure=False,
retries=0,
dag=sample_dag
)
# Task for the next dag to run
next_dag_trigger_task = TriggerDagRunOperator(
task_id="trigger_next_dag",
trigger_dag_id=next_dag_id,
provide_context=True,
dag=sample_dag,
)
create_subscription_to_pubsub >> \
wait_on_pubsub >> \
next_dag_trigger_task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment