Skip to content

Instantly share code, notes, and snippets.

@msumit
Last active December 12, 2023 21:28
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save msumit/40f7905d409fe3375c9a01fa73070b73 to your computer and use it in GitHub Desktop.
Save msumit/40f7905d409fe3375c9a01fa73070b73 to your computer and use it in GitHub Desktop.
Airflow file sensor example
from airflow import DAG
from airflow.operators.sensors import S3KeySensor
from airflow.operators import BashOperator
from datetime import datetime, timedelta
yday = datetime.combine(datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'msumit',
'depends_on_past': False,
'start_date': yday,
'email': ['sumeet.manit@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_file_sensor', default_args=default_args, schedule_interval='@daily')
t1 = S3KeySensor(
task_id='s3_file_test',
poke_interval=0,
timeout=10,
soft_fail=True,
bucket_key='s3://dev.canopydata.com/airflow/example_qubole_operator.py',
bucket_name=None,
dag=dag)
t2 = BashOperator(
task_id='task2',
depends_on_past=False,
bash_command='echo a big hadoop job putting files on s3',
trigger_rule='all_failed',
dag=dag)
t3 = BashOperator(
task_id='task3',
depends_on_past=False,
bash_command='echo im next job using s3 files',
trigger_rule='all_done',
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
t3.set_upstream(t2)
@msumit
Copy link
Author

msumit commented Apr 10, 2019

@rublinetsky it's a sample code, so the file might not exist there or you won't have access to that.
@anilkulkarni87 I guess you can provide extra information while setting up the default s3 connection with role & external_id and boto should take care of that.
@RahulJupelly that's the name of a file I'm sensing for in S3.
@tonyofleon can't say for sure, but it generally happens due version of certificate S3 regions expects, i.e. v2 or v4.

@Kuvanil
Copy link

Kuvanil commented Jan 24, 2020

The Bucket_key in S3 is having a suffix (generated with timestamp) , now how to call using in S3KeySensor. I don't want to specify as None, as I'm keeping exceptions as well.
Also tried calling thru "*" (asterisk) at the end. But It's poking , but not linking to any path or file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment