Skip to content

Instantly share code, notes, and snippets.

@r39132
Last active February 10, 2016 06:37
Show Gist options
  • Save r39132/b22671d0293cd7740b38 to your computer and use it in GitHub Desktop.
Save r39132/b22671d0293cd7740b38 to your computer and use it in GitHub Desktop.
Sample Test with S3_Key_Sensor
import os
from airflow import DAG
from airflow.operators import *
from datetime import datetime, timedelta
from datetime import date
START_DATE = datetime(2016, 2, 4, 0, 0, 0)
default_args = {
'owner':'sanand',
'depends_on_past': False,
'start_date': START_DATE,
'email': ['sanand@****'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
dag_id='test_s3_sensor',
default_args=default_args,
schedule_interval='* * * * *',
dagrun_timeout=timedelta(minutes=60*5))
dummy_op = DummyOperator(
task_id='kick-off',
dag=dag)
def detect_required_s3_keys(name, dag=dag, upstream=dummy_op):
task = S3KeySensor(
task_id = name,
bucket_key='s3://******/ingest2016/02/10/06/*****-fe30503e-7014-431a-8f9d-a430880f130c',
s3_conn_id='my_s3_conn',
dag=dag,
wildcard_match=True)
task.set_upstream(upstream)
return task
foo_1 = detect_required_s3_keys('foo_1')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment