Skip to content

Instantly share code, notes, and snippets.

@dustinstansbury
Created April 14, 2017 22:30
Show Gist options
  • Save dustinstansbury/dda1a81f06027ae8bdd9a3fa4d51571c to your computer and use it in GitHub Desktop.
Save dustinstansbury/dda1a81f06027ae8bdd9a3fa4d51571c to your computer and use it in GitHub Desktop.
"""
## Example Airflow Workflow (DAG)
Markdown docstrings are rendered in the Airflow UI!!!
"""
from airflow import DAG
from airflow.models import BaseOperator
from datetime import datetime, timedelta
# define set of dummy tasks
class ExtractAdRevenueOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super(ExtractAdRevenueOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
class ExtractAppStoreRevenueOperator(BaseOperator):
def __init__(self, app_store_name, *args, **kwargs):
self.app_store_name = app_store_name
super(ExtractAppStoreRevenueOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
class ExtractConversionRatesOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super(ExtractConversionRatesOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
class TransformSpreadsheetDataOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super(TransformSpreadsheetDataOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
class TransformJSONDataOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super(TransformJSONDataOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
class CurrencyConversionsOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super(CurrencyConversionsOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
class CombineDataRevenueDataOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super(CombineDataRevenueDataOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
class CheckHistoricalDataOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super(CheckHistoricalDataOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
class RevenuePredictionOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super(RevenuePredictionOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
WORKFLOW_START_DATE = datetime(2017, 1, 1)
default_args = {
'owner': 'example',
'depends_on_past': False,
'start_date': WORKFLOW_START_DATE,
'email': ['example@example_company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'example_workflow_dag',
start_date=WORKFLOW_START_DATE,
schedule_interval=timedelta(1),
default_args=default_args,
)
# set the DAG documentation
dag.doc_md = __doc__
## Define all tasks
# task to wait for ad network revenue and extract
extract_ad_revenue = ExtractAdRevenueOperator(
task_id='extract_ad_revenue',
dag=dag)
# dynamically create tasks to wait and extract app store data
APP_STORES = ['app_store_a', 'app_store_b', 'app_store_c']
app_store_tasks = []
for app_store in APP_STORES:
task = ExtractAppStoreRevenueOperator(
task_id='extract_{}_revenue'.format(app_store),
dag=dag,
app_store_name=app_store,
)
app_store_tasks.append(task)
# task to wait for and extract conversion rates from API
extract_conversion_rates = ExtractConversionRatesOperator(
task_id='get_conversion_rates',
dag=dag)
# task to transform Spreadsheet data
transform_spreadsheet_data = TransformSpreadsheetDataOperator(
task_id='transform_spreadsheet_data',
dag=dag)
# task to transform JSON data
transform_json_data = TransformJSONDataOperator(
task_id='transform_json_data',
dag=dag)
# task to apply currency exchange rates
perform_currency_conversions = CurrencyConversionsOperator(
task_id='perform_currency_conversions',
dag=dag)
# task to combine all data sources
combine_revenue_data = CombineDataRevenueDataOperator(
task_id='combine_revenue_data',
dag=dag)
# task to check that historical data exists
check_historical_data = CheckHistoricalDataOperator(
task_id='check_historical_data',
dag=dag)
# task to make predictions from historical data
predict_revenue = RevenuePredictionOperator(
task_id='predict_revenue',
dag=dag)
## Define all task dependencies
# extract_ad_revenue depends on transform_spreadsheet_data, etc.
transform_spreadsheet_data.set_upstream(extract_ad_revenue)
# dynamically define app store dependencies
for task in app_store_tasks:
transform_json_data.set_upstream(task)
perform_currency_conversions.set_upstream(transform_json_data)
perform_currency_conversions.set_upstream(extract_conversion_rates)
combine_revenue_data.set_upstream(transform_spreadsheet_data)
combine_revenue_data.set_upstream(perform_currency_conversions)
check_historical_data.set_upstream(combine_revenue_data)
predict_revenue.set_upstream(check_historical_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment