Skip to content

Instantly share code, notes, and snippets.

@vinayak-mehta
Created September 26, 2018 08:41
Show Gist options
  • Save vinayak-mehta/aa74131dc913df10008c6246646a00dc to your computer and use it in GitHub Desktop.
Save vinayak-mehta/aa74131dc913df10008c6246646a00dc to your computer and use it in GitHub Desktop.
import glob
import datetime as dt
import yaml
import requests
from airflow import DAG
from airflow.operators import PythonOperator
def extract(**kwargs):
url = kwargs.get('url')
method = kwargs.get('method')
param = kwargs.get('param')
value = kwargs.get('value')
r = requests.get("{}/{}?{}={}".format(url, method, param, value))
print r.json()
def transform(**kwargs):
print kwargs
def load(**kwargs):
print kwargs
# read every yaml file in the current working directory
for y in glob.glob('*.yaml'):
with open(y, 'r') as f:
dag_meta = yaml.safe_load(f)
dag = DAG(dag_id=dag_meta['dag_id'], start_date=dag_meta['start_date'])
task_bag = {} # dictionary to keep track of all tasks
pattern = dag_meta['pattern']
# create tasks dynamically
for e in pattern['extract']:
task_id = 'extract{}'.format(e['source'])
task = PythonOperator(task_id=task_id, python_callable=extract, op_kwargs=e, dag=dag)
task_bag[task_id] = task
for t in pattern['transform']:
task_id = 'transform{}'.format(t['transform'])
task = PythonOperator(task_id=task_id, python_callable=transform, op_kwargs=t, dag=dag)
# link current task to previous task based on a common ID
task.set_upstream(task_bag['extract{}'.format(t['transform'])])
task_bag[task_id] = task
for l in pattern['load']:
task_id = 'load{}'.format(l['sink'])
task = PythonOperator(task_id=task_id, python_callable=load, op_kwargs=l, dag=dag)
# link current task to previous task based on a common ID
task.set_upstream(task_bag['transform{}'.format(l['sink'])])
task_bag[task_id] = task
# the DAG must appear in globals
# https://airflow.apache.org/concepts.html#scope
globals()[dag_meta['dag_id']] = dag
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment