Skip to content

Instantly share code, notes, and snippets.

@msumit
Last active December 12, 2023 21:28
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save msumit/40f7905d409fe3375c9a01fa73070b73 to your computer and use it in GitHub Desktop.
Save msumit/40f7905d409fe3375c9a01fa73070b73 to your computer and use it in GitHub Desktop.
Airflow file sensor example
from airflow import DAG
from airflow.operators.sensors import S3KeySensor
from airflow.operators import BashOperator
from datetime import datetime, timedelta
yday = datetime.combine(datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'msumit',
'depends_on_past': False,
'start_date': yday,
'email': ['sumeet.manit@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_file_sensor', default_args=default_args, schedule_interval='@daily')
t1 = S3KeySensor(
task_id='s3_file_test',
poke_interval=0,
timeout=10,
soft_fail=True,
bucket_key='s3://dev.canopydata.com/airflow/example_qubole_operator.py',
bucket_name=None,
dag=dag)
t2 = BashOperator(
task_id='task2',
depends_on_past=False,
bash_command='echo a big hadoop job putting files on s3',
trigger_rule='all_failed',
dag=dag)
t3 = BashOperator(
task_id='task3',
depends_on_past=False,
bash_command='echo im next job using s3 files',
trigger_rule='all_done',
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
t3.set_upstream(t2)
@msumit
Copy link
Author

msumit commented May 28, 2016

screen shot 2016-05-26 at 11 50 57 am
screen shot 2016-05-26 at 11 51 12 am

@Anmolk7
Copy link

Anmolk7 commented Jun 2, 2017

what about sensing files on local drive on local host?

@owenmyerscsm
Copy link

owenmyerscsm commented Jul 25, 2017

Thank you for this. One minor thing: I think seven_days_ago should be yesterday

@msumit
Copy link
Author

msumit commented Aug 14, 2017

@Anmolk7 I think for that you can extend the BaseSensor and write up poke method with some simple python code to return True/False on the basis of presence of the file/s

@owenmyerscsm nice catch :)

@rublinetsky
Copy link

Sumit, I am trying to run this example and I am getting the error:

ssl.CertificateError: hostname u'dev.canopydata.com.s3.amazonaws.com' doesn't match either of '*.s3.amazonaws.com', 's3.amazonaws.com'

This is a known issue with bucket names that include dots. I tried one known work-around (adding "calling_format": "boto.s3.connection.OrdinaryCallingFormat" to the connection), but it did not help - the certificate mismatch problem goes away, but now I am getting "301 Moved Permanently" message.

Which versions of Boto and Python are you using? My freshly installed development airflow runs on python 2.7.10 and has boto-2.48.0 installed.

@anilkulkarni87
Copy link

When are we setting the S3Connection. We use Temperory credentials. Wondering how can we do that in realtime in a DAG.

@RahulJupelly
Copy link

Hi Sumit, Can you please explain little about "bucket_key='s3://dev.canopydata.com/airflow/example_qubole_operator.py'"

@tonyofleon
Copy link

I'm trying to use this, but it only works for my buckets in west region, for my buckets in East I get S3ResponseError: 400 bad request. Any workaround for this?

Thanks.

@msumit
Copy link
Author

msumit commented Apr 10, 2019

@rublinetsky it's a sample code, so the file might not exist there or you won't have access to that.
@anilkulkarni87 I guess you can provide extra information while setting up the default s3 connection with role & external_id and boto should take care of that.
@RahulJupelly that's the name of a file I'm sensing for in S3.
@tonyofleon can't say for sure, but it generally happens due version of certificate S3 regions expects, i.e. v2 or v4.

@Kuvanil
Copy link

Kuvanil commented Jan 24, 2020

The Bucket_key in S3 is having a suffix (generated with timestamp) , now how to call using in S3KeySensor. I don't want to specify as None, as I'm keeping exceptions as well.
Also tried calling thru "*" (asterisk) at the end. But It's poking , but not linking to any path or file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment