Skip to content

Instantly share code, notes, and snippets.

@tmarthal
Created February 23, 2017 01:06
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save tmarthal/edeae7f6f8780dc53887a16b7b20f205 to your computer and use it in GitHub Desktop.
Save tmarthal/edeae7f6f8780dc53887a16b7b20f205 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable, DAG
from datetime import date, datetime, timedelta
default_args = {
'owner': '@tmarthal',
'start_date': datetime(2017, 2, 19),
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
##
## The DAG for the application account processing job to run
##
dag = DAG('account_processing',
default_args=default_args,
schedule_interval = '@daily' ## '*/5 * * * *' # 5 minutes for testing
)
# Without any operators, this dag file will not execute/trigger
op = DummyOperator(task_id='dummy', dag=dag)
def response_check(response):
"""
Dumps the http response and returns True when the http call status is 200/success
"""
print(response)
return response.status_code == 200
def process_new_accounts():
"""
Query the accounts table and trigger a set of operator(s) for each individal id
"""
# get yesterday's date
ds = (date.today() - timedelta(1)).isoformat()
select_sql = "SELECT id from accounts where created_at > '{}'".format(ds)
# create the localhost
pg_hook = PostgresHook(postgres_conn_id='account-database')
connection = pg_hook.get_conn()
print("checking for new accounts")
cursor = connection.cursor()
cursor.execute(select_sql)
sql_results = cursor.fetchall()
return sql_results
# Note that this runs the query every time the airflow heartbeat triggers(!)
account_ids = process_new_accounts()
for account_id in account_ids:
# the child dag name
export_account_task_name = 'process_account_%s' % account_id
# the DAG creation cannot be in a Sensor or other Operator
export_account_dag = DAG(
dag_id='%s.%s' % (dag.dag_id, export_account_task_name),
default_args=default_args,
schedule_interval='@once' # defaults to timedelta(1) - '@once' runs it right away, one time
)
## This hits the account export url, _internal/accounts/export?id={ACCOUNT_ID}&token={AUTH_TOKEN}
account_export_endpoint_task = SimpleHttpOperator(
task_id='account_export_endpoint_task_%s' % (account_id),
http_conn_id='application',
method='GET',
endpoint='_endpoint/account/export',
data={"id": "{}".format(account_id), "token": Variable.get("APPLICATION_ACCESS_TOKEN")}, # http params
response_check=response_check, # will retry based on default_args if it fails
dag=export_account_dag)
print("Created account processing DAG {}".format(export_account_dag.dag_id))
# register the dynamically created DAG in the global namespace
globals()[export_account_task_name] = export_account_dag
@tmarthal
Copy link
Author

screenshot 2017-02-20 15 28 23

@AhmedKamal
Copy link

Thanks you :)

@chankay0102
Copy link

thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment