Created
February 2, 2017 17:53
-
-
Save lauralorenz/bf47280b90067c71fe691bdf70b4145a to your computer and use it in GitHub Desktop.
Example of a self healing Airflow sensor polling an external async job
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Run a daily backup of all our GraphStory instances using the GraphStory alpha API. | |
Run every day at 23:00 UTC | |
""" | |
import admin # noqa | |
from airflow import DAG | |
import datetime | |
from datadive import settings | |
from datadive.neo4j_backups.sense import GraphStoryExportSensor | |
yesterday_at_elevenpm = datetime.datetime.combine(datetime.datetime.today() + datetime.timedelta(-1), | |
datetime.time(hour=23, minute=0, second=0)) | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'start_date': yesterday_at_elevenpm, | |
'email': ['tech.team@industrydive.com'], | |
'email_on_failure': True, | |
'email_on_retry': True, | |
'retries': 3, | |
'retry_delay': datetime.timedelta(minutes=10) | |
} | |
dag = DAG( | |
dag_id='neo4j_backups', | |
schedule_interval=datetime.timedelta(days=1), | |
default_args=default_args | |
) | |
# all of our instances IDs we'll triggering backups for | |
instance_ids = settings.GRAPH_STORY_INSTANCE_IDS | |
# trigger a backup for every instance we have | |
for instance_id in instance_ids: | |
export_neo4j_data = GraphStoryExportSensor( | |
task_id='export_neo4j_data_%s' % instance_id, | |
poke_interval=60, # poke once a minute | |
timeout=60 * 30, # try again in 30 mins. Cold backups appear to take ~13 mins and then need to be compressed | |
instance_id=instance_id, | |
provide_context=True, | |
dag=dag | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datadive import settings | |
from datadive.neo4j_backups.graphstory_client import GraphStoryClient | |
from datadive.smart_airflow import DiveOperator | |
from airflow.operators import BaseSensorOperator | |
import json | |
import datetime | |
import time | |
class GraphStoryExportSensor(DiveOperator, BaseSensorOperator): | |
""" | |
Triggers a GraphStory export using the GraphStory API, and waits for it to finish. | |
""" | |
def __init__(self, *args, **kwargs): | |
""" | |
Initialize a GraphStoryExportSensor which should know what instance_id it is performing an export for. | |
:param args: not expected | |
:param kwargs: should contain key 'instance_id' rom the Task's config in the DAG definition file | |
""" | |
super(GraphStoryExportSensor, self).__init__(*args, **kwargs) | |
# get the instance we're exporting for | |
self.instance_id = kwargs['instance_id'] | |
self.client = GraphStoryClient(settings.GRAPH_STORY_API_KEY) | |
# instantiate an export that we will poke for | |
# if we fail, the next sensor will try a new export | |
# lazy load so that it is only called when we try and utilize self.export_id in the task's poke() method | |
self._export_id = None | |
@property | |
def export_id(self): | |
""" | |
Lazy load an export_id which is returned from instantiating an asynchronous export job on GraphStory's API server. | |
:return: 'exportId' from the client's :py:func:`datadive.neo4j_backups.graphstory_client.GraphStoryClient.export_instance` | |
:rtype: str | |
""" | |
if self._export_id is None: | |
# if we cannot get the exportId off the response, fatal error; fail the TaskInstance and retry per Task configuration | |
self._export_id = self.client.export_instance(self.instance_id).json()['data']['exportId'] | |
return self._export_id | |
def poke(self, context, *args, **kwargs): | |
""" | |
Triggers a GraphStory export using the GraphStory API, and waits for it to finish. | |
:param context: Context dictionary forwarded from the DAG; expects to contain context.dag and context.task | |
:type context: dict | |
:param args: not expected | |
:param kwargs: not expected | |
:return: whether the poke method has succeeded or not, 0 or 1 | |
:rtype: int | |
""" | |
# returns 1 (true) or 0 (false) based on whether the export job is done or not | |
# calling to check the export will actually first trigger an export process since we call lazy-loaded self.export_id as an argument | |
# trixy trixy | |
response, export_done_code = self.client.check_export(self.instance_id, self.export_id) | |
# get the body of the response which is in JSON | |
response = response.json() | |
return export_done_code |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment