Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
AppEngineOperator for Airflow
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.exceptions import AirflowException
from airflow.hooks.http_hook import HttpHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import json
import time
class AppEngineOperator(BaseOperator):
"""
This Operator will allow you to:
1. Schedule some type of job through self.schedule_job()
2. Poll for job completion as indicated by a success/failure file in GCS.
Your job will need to write the success/failure file to:
{job_id}/{succeeded/failed}
Written by: @ryancbuckley
"""
template_fields = ('command_params', 'job_id',)
@apply_defaults
def __init__(self,
task_id,
http_conn_id,
endpoint,
bucket,
command_params,
job_id,
google_cloud_conn_id='google_cloud_storage_default',
**kwargs):
super(AppEngineOperator, self).__init__(task_id=task_id, **kwargs)
self.http_conn_id = http_conn_id
self.endpoint = endpoint
self.bucket = bucket
command_params['job_id'] = job_id
self.command_params = command_params
self.job_id = job_id
self.google_cloud_conn_id = google_cloud_conn_id
def schedule_job(self):
hook = HttpHook(
method='POST',
http_conn_id=self.http_conn_id)
hook.run(
endpoint=self.endpoint,
headers={'content-type': 'application/json', 'Accept': 'text/plain'},
data=json.dumps(self.command_params),
extra_options=None)
def poll_status_files(self):
success_file_name = '%s/succeeded' % self.job_id
fail_file_name = '%s/failed' % self.job_id
i = 0
while True:
time.sleep(5 * 2**i)
i += 1
if check_gcs_file_exists(success_file_name, self.google_cloud_conn_id, self.bucket):
return
if check_gcs_file_exists(fail_file_name, self.google_cloud_conn_id, self.bucket):
raise AirflowException('found failure file %s/%s' % (self.bucket, fail_file_name))
def execute(self, context):
# It seems that when an operator returns, it is considered successful,
# and an operator fails if and only if it raises an AirflowException.
# Good luck finding documentation saying that though.
self.schedule_job()
self.poll_status_files()
def check_gcs_file_exists(file_name, google_cloud_conn_id, bucket):
hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=google_cloud_conn_id)
return hook.exists(bucket, file_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment