Skip to content

Instantly share code, notes, and snippets.

View chandulal's full-sized avatar

chandu kavar chandulal

View GitHub Profile
@chandulal
chandulal / hello_world.py
Created August 9, 2018 10:18
Hello World DAG
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators import MultiplyBy5Operator
def print_hello():
return 'Hello Wolrd'
dag = DAG('hello_world', description='Hello world example', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False)
import unittest
from airflow.models import DagBag
class TestDagIntegrity(unittest.TestCase):
LOAD_SECOND_THRESHOLD = 2
def setUp(self):
self.dagbag = DagBag()
@chandulal
chandulal / hello_world_dag_test.py
Created August 9, 2018 11:11
DAG/Pipeline definition tests
import unittest
from airflow.models import DagBag
class TestHelloWorldDAG(unittest.TestCase):
"""Check HelloWorldDAG expectation"""
def setUp(self):
self.dagbag = DagBag()
def test_task_count(self):
import logging
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
log = logging.getLogger(__name__)
class MultiplyBy5Operator(BaseOperator):
import unittest
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import MultiplyBy5Operator
class TestMultiplyBy5Operator(unittest.TestCase):
def test_execute(self):
from datetime import datetime, timedelta
import time
import logging
from airflow.operators.sensors import BaseSensorOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
log = logging.getLogger(__name__)
class HelloworldSensor(BaseSensorOperator):
import unittest
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import HelloworldSensor
class TestHelloworldSensor(unittest.TestCase):
def test_poke_should_return_false_when_value_of_minute_is_not_divisible_by_3(self):
dag = DAG(dag_id='anydag', start_date=datetime.now())
sensor_task = HelloworldSensor(
@chandulal
chandulal / helloworld_xcoms.py
Created September 8, 2019 06:31
Hello World DAG with Xcoms
import datetime
from airflow import DAG
from airflow.operators import PythonOperator
from airflow.operators import BashOperator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
def test_xcoms(self):
dag_id = 'hello_world_xcoms'
dag = self.dagbag.get_dag(dag_id)
push_to_xcoms_task = dag.get_task('push_to_xcoms')
pull_from_xcoms_task = dag.get_task('pull_from_xcoms')
execution_date = datetime.now()
push_to_xcoms_ti = TaskInstance(task=push_to_xcoms_task, execution_date=execution_date)
context = push_to_xcoms_ti.get_template_context()
def test_xcom_in_templated_field(self):
dag_id = 'hello_world_xcoms'
dag = self.dagbag.get_dag(dag_id)
push_to_xcoms_task = dag.get_task('push_to_xcoms')
execution_date = datetime.now()
push_to_xcoms_ti = TaskInstance(task=push_to_xcoms_task, execution_date=execution_date)
context = push_to_xcoms_ti.get_template_context()
push_to_xcoms_task.execute(context)