Last active
April 26, 2020 07:35
-
-
Save syossan27/087b2cf0156dad2fb206ccadf7b80caf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from airflow import DAG | |
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor | |
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator | |
from airflow.exceptions import AirflowException | |
from airflow.hooks.http_hook import HttpHook | |
from airflow.operators.http_operator import SimpleHttpOperator | |
from airflow.utils.dates import days_ago | |
from datetime import timedelta, datetime | |
cloud_functions_url = 'https://asia-northeast1-inference-pipeline.cloudfunctions.net' | |
metadata_url = 'http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience=' | |
dag = DAG( | |
'inference_pipeline', | |
default_args={ | |
'start_date': days_ago(1), | |
'retries': 1, | |
'retry_delay': timedelta(minutes=5) | |
}, | |
schedule_interval='@daily', | |
dagrun_timeout=timedelta(minutes=60), | |
catchup=False | |
) | |
class RunCloudFunctionsOperator(SimpleHttpOperator): | |
def execute(self, context): | |
http = HttpHook(self.method, http_conn_id=self.http_conn_id) | |
self.log.info("Calling HTTP method") | |
target_audience = cloud_functions_url + self.endpoint | |
fetch_instance_id_token_url = metadata_url + target_audience | |
r = requests.get(fetch_instance_id_token_url, headers={"Metadata-Flavor": "Google"}, verify=False) | |
idt = r.text | |
self.headers = {'Authorization': "Bearer " + idt} | |
response = http.run(self.endpoint, | |
self.data, | |
self.headers, | |
self.extra_options) | |
if self.response_check: | |
if not self.response_check(response): | |
raise AirflowException("Response check returned False.") | |
csv_sensor = GoogleCloudStoragePrefixSensor( | |
task_id='csv_sensor', | |
bucket='test', | |
prefix='data/{}-'.format(datetime.now().strftime('%Y%m%d')), | |
timeout=60 * 60 * 24 * 2, | |
pool='csv_sensor', | |
dag=dag | |
) | |
preprocessing = RunCloudFunctionsOperator( | |
task_id='preprocessing', | |
method='GET', | |
http_conn_id='http_default', | |
endpoint='/preprocessing', | |
headers={}, | |
xcom_push=False, | |
response_check=lambda response: False if response.status_code != 200 else True, | |
dag=dag, | |
) | |
import_bq = GoogleCloudStorageToBigQueryOperator( | |
task_id='import_bq', | |
bucket='test', | |
source_objects=['preprocess_data/*.csv'], # 読み込み先GCSの指定 | |
source_format='CSV', | |
allow_quoted_newlines=True, # 引用符内の改行を許可し、行の区切りと認識しないよう設定 | |
skip_leading_rows=1, # CSV Headerの読み込みをスキップ | |
destination_project_dataset_table='test.data', # 出力先BigQueryの指定 | |
schema_fields=[ # BigQuery Tableのスキーマを指定 | |
{'name': 'id', 'type': 'INTEGER'}, | |
], | |
write_disposition='WRITE_TRUNCATE', # 出力方法を指定 | |
dag=dag | |
) | |
# タスク依存関係の設定 | |
csv_sensor >> preprocessing >> import_bq |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment