Skip to content

Instantly share code, notes, and snippets.

@syossan27
Last active April 26, 2020 03:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save syossan27/f574b1b158ebd2fb8f07a731a3877418 to your computer and use it in GitHub Desktop.
Save syossan27/f574b1b158ebd2fb8f07a731a3877418 to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow.utils.dates import days_ago
from datetime import timedelta, datetime
dag = DAG(
'inference_pipeline', # DAGの名称
default_args={
'start_date': days_ago(1), # DAGがスケジューリングされる日付を指定
'retries': 1, # タスク失敗時のリトライ回数の指定
'retry_delay': timedelta(minutes=5), # リトライまでの待機時間の指定
},
schedule_interval='@daily', # DAGのスケジューリング間隔の指定
dagrun_timeout=timedelta(minutes=60), # 非常にややこしいパラメータなので記事内説明
# start_dateからDAG実行時までにスケジューリングされる予定だったDAGが
# 同時に実行されるのを抑制するために設定
# 参考URL: https://qiita.com/sh_kawakami/items/e9f06f80475143736a80
catchup=False
)
# Cloud Storageに指定のCSVがアップロードされたら実行されるタスク
csv_sensor = GoogleCloudStoragePrefixSensor(
task_id='csv_sensor', # タスクID
bucket='test', # 監視するCloud Storageバケット名
prefix='data/{}-'.format(datetime.now().strftime('%Y%m%d')), # 参照するCSVのprefixを指定
timeout=60 * 60 * 24, # 監視がタイムアウトするまでの時間
dag=dag # 作成したDAG設定の読み込み
)
# タスク依存関係の設定
csv_sensor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment