Skip to content

Instantly share code, notes, and snippets.

@samklr
Forked from msumit/s3_sensor.py
Created February 12, 2021 20:03
Show Gist options
  • Save samklr/462ecff6277debe1e3a07ff1d9253973 to your computer and use it in GitHub Desktop.
Save samklr/462ecff6277debe1e3a07ff1d9253973 to your computer and use it in GitHub Desktop.
Airflow file sensor example
from airflow import DAG
from airflow.operators.sensors import S3KeySensor
from airflow.operators import BashOperator
from datetime import datetime, timedelta
yday = datetime.combine(datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'msumit',
'depends_on_past': False,
'start_date': yday,
'email': ['sumeet.manit@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_file_sensor', default_args=default_args, schedule_interval='@daily')
t1 = S3KeySensor(
task_id='s3_file_test',
poke_interval=0,
timeout=10,
soft_fail=True,
bucket_key='s3://dev.canopydata.com/airflow/example_qubole_operator.py',
bucket_name=None,
dag=dag)
t2 = BashOperator(
task_id='task2',
depends_on_past=False,
bash_command='echo a big hadoop job putting files on s3',
trigger_rule='all_failed',
dag=dag)
t3 = BashOperator(
task_id='task3',
depends_on_past=False,
bash_command='echo im next job using s3 files',
trigger_rule='all_done',
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
t3.set_upstream(t2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment