Skip to content

Instantly share code, notes, and snippets.

@vinayak-mehta
Created September 26, 2018 08:37
Show Gist options
  • Save vinayak-mehta/f3e3f6fc7a9b2a4c2a071961ecf6c70a to your computer and use it in GitHub Desktop.
Save vinayak-mehta/f3e3f6fc7a9b2a4c2a071961ecf6c70a to your computer and use it in GitHub Desktop.
import datetime as dt
import requests
from airflow import DAG
from airflow.operators import PythonOperator
def extract(**kwargs):
param = kwargs.get('param')
r = requests.get("http://httpbin.org/get?query={}".format(param))
print r.json()
def transform(**kwargs):
print kwargs
def load(**kwargs):
print kwargs
start_date = dt.datetime.strptime('2018-08-01T00:00:00', '%Y-%m-%dT%H:%M:%S')
with DAG(dag_id='my_dag', start_date=start_date) as dag:
task1 = PythonOperator(task_id='extract', python_callable=extract, op_kwargs={'param': 'a'})
task2 = PythonOperator(task_id='transform', python_callable=transform, op_kwargs={'foo': 'bar'})
task3 = PythonOperator(task_id='load', python_callable=transform, op_kwargs={'bar': 'baz'})
task2.set_upstream(task1)
task3.set_upstream(task2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment