Skip to content

Instantly share code, notes, and snippets.

View chandulal's full-sized avatar

chandu kavar chandulal

View GitHub Profile
@chandulal
chandulal / hello_world_dag_test.py
Created August 9, 2018 11:11
DAG/Pipeline definition tests
import unittest
from airflow.models import DagBag
class TestHelloWorldDAG(unittest.TestCase):
"""Check HelloWorldDAG expectation"""
def setUp(self):
self.dagbag = DagBag()
def test_task_count(self):
import logging
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
log = logging.getLogger(__name__)
class MultiplyBy5Operator(BaseOperator):
import unittest
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import MultiplyBy5Operator
class TestMultiplyBy5Operator(unittest.TestCase):
def test_execute(self):
from datetime import datetime, timedelta
import time
import logging
from airflow.operators.sensors import BaseSensorOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
log = logging.getLogger(__name__)
class HelloworldSensor(BaseSensorOperator):
import unittest
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import HelloworldSensor
class TestHelloworldSensor(unittest.TestCase):
def test_poke_should_return_false_when_value_of_minute_is_not_divisible_by_3(self):
dag = DAG(dag_id='anydag', start_date=datetime.now())
sensor_task = HelloworldSensor(
@chandulal
chandulal / helloworld_xcoms.py
Created September 8, 2019 06:31
Hello World DAG with Xcoms
import datetime
from airflow import DAG
from airflow.operators import PythonOperator
from airflow.operators import BashOperator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
def test_xcoms(self):
dag_id = 'hello_world_xcoms'
dag = self.dagbag.get_dag(dag_id)
push_to_xcoms_task = dag.get_task('push_to_xcoms')
pull_from_xcoms_task = dag.get_task('pull_from_xcoms')
execution_date = datetime.now()
push_to_xcoms_ti = TaskInstance(task=push_to_xcoms_task, execution_date=execution_date)
context = push_to_xcoms_ti.get_template_context()
def test_xcom_in_templated_field(self):
dag_id = 'hello_world_xcoms'
dag = self.dagbag.get_dag(dag_id)
push_to_xcoms_task = dag.get_task('push_to_xcoms')
execution_date = datetime.now()
push_to_xcoms_ti = TaskInstance(task=push_to_xcoms_task, execution_date=execution_date)
context = push_to_xcoms_ti.get_template_context()
push_to_xcoms_task.execute(context)
import sys
import unittest
sys.path.append('../')
from airflow_api import AirflowAPI
class TestHelloWorldDag(unittest.TestCase):
"""Integration test for Hello world DAG"""
from airflow.operators.presto_to_mysql import PrestoToMySqlTransfer
from datetime import datetime
from airflow import DAG
default_args = {
'email': ['hello@world.com']
}
dag = DAG('presto_to_mysql', description='Presto to Mysql Transfer', default_args=default_args,
schedule_interval='0 12 * * *',