Skip to content

Instantly share code, notes, and snippets.

@syossan27
Last active April 26, 2020 07:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save syossan27/087b2cf0156dad2fb206ccadf7b80caf to your computer and use it in GitHub Desktop.
Save syossan27/087b2cf0156dad2fb206ccadf7b80caf to your computer and use it in GitHub Desktop.
import requests
from airflow import DAG
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.exceptions import AirflowException
from airflow.hooks.http_hook import HttpHook
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.dates import days_ago
from datetime import timedelta, datetime
cloud_functions_url = 'https://asia-northeast1-inference-pipeline.cloudfunctions.net'
metadata_url = 'http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience='
dag = DAG(
'inference_pipeline',
default_args={
'start_date': days_ago(1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
schedule_interval='@daily',
dagrun_timeout=timedelta(minutes=60),
catchup=False
)
class RunCloudFunctionsOperator(SimpleHttpOperator):
def execute(self, context):
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
self.log.info("Calling HTTP method")
target_audience = cloud_functions_url + self.endpoint
fetch_instance_id_token_url = metadata_url + target_audience
r = requests.get(fetch_instance_id_token_url, headers={"Metadata-Flavor": "Google"}, verify=False)
idt = r.text
self.headers = {'Authorization': "Bearer " + idt}
response = http.run(self.endpoint,
self.data,
self.headers,
self.extra_options)
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
csv_sensor = GoogleCloudStoragePrefixSensor(
task_id='csv_sensor',
bucket='test',
prefix='data/{}-'.format(datetime.now().strftime('%Y%m%d')),
timeout=60 * 60 * 24 * 2,
pool='csv_sensor',
dag=dag
)
preprocessing = RunCloudFunctionsOperator(
task_id='preprocessing',
method='GET',
http_conn_id='http_default',
endpoint='/preprocessing',
headers={},
xcom_push=False,
response_check=lambda response: False if response.status_code != 200 else True,
dag=dag,
)
import_bq = GoogleCloudStorageToBigQueryOperator(
task_id='import_bq',
bucket='test',
source_objects=['preprocess_data/*.csv'], # 読み込み先GCSの指定
source_format='CSV',
allow_quoted_newlines=True, # 引用符内の改行を許可し、行の区切りと認識しないよう設定
skip_leading_rows=1, # CSV Headerの読み込みをスキップ
destination_project_dataset_table='test.data', # 出力先BigQueryの指定
schema_fields=[ # BigQuery Tableのスキーマを指定
{'name': 'id', 'type': 'INTEGER'},
],
write_disposition='WRITE_TRUNCATE', # 出力方法を指定
dag=dag
)
# タスク依存関係の設定
csv_sensor >> preprocessing >> import_bq
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment