Last active
April 26, 2020 03:44
-
-
Save syossan27/f574b1b158ebd2fb8f07a731a3877418 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor | |
from airflow.utils.dates import days_ago | |
from datetime import timedelta, datetime | |
dag = DAG( | |
'inference_pipeline', # DAGの名称 | |
default_args={ | |
'start_date': days_ago(1), # DAGがスケジューリングされる日付を指定 | |
'retries': 1, # タスク失敗時のリトライ回数の指定 | |
'retry_delay': timedelta(minutes=5), # リトライまでの待機時間の指定 | |
}, | |
schedule_interval='@daily', # DAGのスケジューリング間隔の指定 | |
dagrun_timeout=timedelta(minutes=60), # 非常にややこしいパラメータなので記事内説明 | |
# start_dateからDAG実行時までにスケジューリングされる予定だったDAGが | |
# 同時に実行されるのを抑制するために設定 | |
# 参考URL: https://qiita.com/sh_kawakami/items/e9f06f80475143736a80 | |
catchup=False | |
) | |
# Cloud Storageに指定のCSVがアップロードされたら実行されるタスク | |
csv_sensor = GoogleCloudStoragePrefixSensor( | |
task_id='csv_sensor', # タスクID | |
bucket='test', # 監視するCloud Storageバケット名 | |
prefix='data/{}-'.format(datetime.now().strftime('%Y%m%d')), # 参照するCSVのprefixを指定 | |
timeout=60 * 60 * 24, # 監視がタイムアウトするまでの時間 | |
dag=dag # 作成したDAG設定の読み込み | |
) | |
# タスク依存関係の設定 | |
csv_sensor |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment