Skip to content

Instantly share code, notes, and snippets.

View chandulal's full-sized avatar

chandu kavar chandulal

View GitHub Profile
@chandulal
chandulal / hello_world.py
Created August 9, 2018 10:18
Hello World DAG
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators import MultiplyBy5Operator
def print_hello():
return 'Hello Wolrd'
dag = DAG('hello_world', description='Hello world example', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False)
import unittest
from airflow.models import DagBag
class TestDagIntegrity(unittest.TestCase):
LOAD_SECOND_THRESHOLD = 2
def setUp(self):
self.dagbag = DagBag()
def push_to_xcoms(*args, **kwargs):
value = "dummyValue"
kwargs['ti'].xcom_push(key="dummyKey", value=value)
push_to_xcoms_task = PythonOperator(
task_id='push_to_xcoms',
provide_context=True,
python_callable=push_to_xcoms,
dag=dag
)
def push_to_xcoms(*args, **kwargs):
value = "dummyValue"
kwargs['ti'].xcom_push(key="dummyKey", value=value)
def pull_from_xcoms(**kwargs):
ti = kwargs['ti']
pulled_value = ti.xcom_pull(key='dummyKey', task_ids='push_to_xcoms')
print("value=" + str(pulled_value))
push_to_xcoms_task = PythonOperator(
self.airflow_api.add_presto_connection("presto-conn",presto_catlog ,presto_schema)
self.airflow_api.add_mysql_connection("mysql-conn", mysql_database, mysql_user, mysql_password)
import sys
import unittest
import mysql.connector
import prestodb
sys.path.append('../')
from airflow_api import AirflowAPI
from db_util import DBUtil
from constants import PRESTO_DB_PORT,MYSQL_DB_PORT
from airflow.operators.presto_to_mysql import PrestoToMySqlTransfer
from datetime import datetime
from airflow import DAG
default_args = {
'email': ['hello@world.com']
}
dag = DAG('presto_to_mysql', description='Presto to Mysql Transfer', default_args=default_args,
schedule_interval='0 12 * * *',
import sys
import unittest
sys.path.append('../')
from airflow_api import AirflowAPI
class TestHelloWorldDag(unittest.TestCase):
"""Integration test for Hello world DAG"""
def test_xcom_in_templated_field(self):
dag_id = 'hello_world_xcoms'
dag = self.dagbag.get_dag(dag_id)
push_to_xcoms_task = dag.get_task('push_to_xcoms')
execution_date = datetime.now()
push_to_xcoms_ti = TaskInstance(task=push_to_xcoms_task, execution_date=execution_date)
context = push_to_xcoms_ti.get_template_context()
push_to_xcoms_task.execute(context)
def test_xcoms(self):
dag_id = 'hello_world_xcoms'
dag = self.dagbag.get_dag(dag_id)
push_to_xcoms_task = dag.get_task('push_to_xcoms')
pull_from_xcoms_task = dag.get_task('pull_from_xcoms')
execution_date = datetime.now()
push_to_xcoms_ti = TaskInstance(task=push_to_xcoms_task, execution_date=execution_date)
context = push_to_xcoms_ti.get_template_context()