Skip to content

Instantly share code, notes, and snippets.

@lauralorenz
Created February 2, 2017 17:53
Show Gist options
  • Save lauralorenz/bf47280b90067c71fe691bdf70b4145a to your computer and use it in GitHub Desktop.
Save lauralorenz/bf47280b90067c71fe691bdf70b4145a to your computer and use it in GitHub Desktop.
Example of a self healing Airflow sensor polling an external async job
"""
Run a daily backup of all our GraphStory instances using the GraphStory alpha API.
Run every day at 23:00 UTC
"""
import admin # noqa
from airflow import DAG
import datetime
from datadive import settings
from datadive.neo4j_backups.sense import GraphStoryExportSensor
yesterday_at_elevenpm = datetime.datetime.combine(datetime.datetime.today() + datetime.timedelta(-1),
datetime.time(hour=23, minute=0, second=0))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday_at_elevenpm,
'email': ['tech.team@industrydive.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 3,
'retry_delay': datetime.timedelta(minutes=10)
}
dag = DAG(
dag_id='neo4j_backups',
schedule_interval=datetime.timedelta(days=1),
default_args=default_args
)
# all of our instances IDs we'll triggering backups for
instance_ids = settings.GRAPH_STORY_INSTANCE_IDS
# trigger a backup for every instance we have
for instance_id in instance_ids:
export_neo4j_data = GraphStoryExportSensor(
task_id='export_neo4j_data_%s' % instance_id,
poke_interval=60, # poke once a minute
timeout=60 * 30, # try again in 30 mins. Cold backups appear to take ~13 mins and then need to be compressed
instance_id=instance_id,
provide_context=True,
dag=dag
)
from datadive import settings
from datadive.neo4j_backups.graphstory_client import GraphStoryClient
from datadive.smart_airflow import DiveOperator
from airflow.operators import BaseSensorOperator
import json
import datetime
import time
class GraphStoryExportSensor(DiveOperator, BaseSensorOperator):
"""
Triggers a GraphStory export using the GraphStory API, and waits for it to finish.
"""
def __init__(self, *args, **kwargs):
"""
Initialize a GraphStoryExportSensor which should know what instance_id it is performing an export for.
:param args: not expected
:param kwargs: should contain key 'instance_id' rom the Task's config in the DAG definition file
"""
super(GraphStoryExportSensor, self).__init__(*args, **kwargs)
# get the instance we're exporting for
self.instance_id = kwargs['instance_id']
self.client = GraphStoryClient(settings.GRAPH_STORY_API_KEY)
# instantiate an export that we will poke for
# if we fail, the next sensor will try a new export
# lazy load so that it is only called when we try and utilize self.export_id in the task's poke() method
self._export_id = None
@property
def export_id(self):
"""
Lazy load an export_id which is returned from instantiating an asynchronous export job on GraphStory's API server.
:return: 'exportId' from the client's :py:func:`datadive.neo4j_backups.graphstory_client.GraphStoryClient.export_instance`
:rtype: str
"""
if self._export_id is None:
# if we cannot get the exportId off the response, fatal error; fail the TaskInstance and retry per Task configuration
self._export_id = self.client.export_instance(self.instance_id).json()['data']['exportId']
return self._export_id
def poke(self, context, *args, **kwargs):
"""
Triggers a GraphStory export using the GraphStory API, and waits for it to finish.
:param context: Context dictionary forwarded from the DAG; expects to contain context.dag and context.task
:type context: dict
:param args: not expected
:param kwargs: not expected
:return: whether the poke method has succeeded or not, 0 or 1
:rtype: int
"""
# returns 1 (true) or 0 (false) based on whether the export job is done or not
# calling to check the export will actually first trigger an export process since we call lazy-loaded self.export_id as an argument
# trixy trixy
response, export_done_code = self.client.check_export(self.instance_id, self.export_id)
# get the body of the response which is in JSON
response = response.json()
return export_done_code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment