Skip to content

Instantly share code, notes, and snippets.

View chandulal's full-sized avatar

chandu kavar chandulal

View GitHub Profile
from datetime import datetime, timedelta
import time
import logging
from airflow.operators.sensors import BaseSensorOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
log = logging.getLogger(__name__)
class HelloworldSensor(BaseSensorOperator):
import unittest
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import MultiplyBy5Operator
class TestMultiplyBy5Operator(unittest.TestCase):
def test_execute(self):
import logging
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
log = logging.getLogger(__name__)
class MultiplyBy5Operator(BaseOperator):
@chandulal
chandulal / hello_world_dag_test.py
Created August 9, 2018 11:11
DAG/Pipeline definition tests
import unittest
from airflow.models import DagBag
class TestHelloWorldDAG(unittest.TestCase):
"""Check HelloWorldDAG expectation"""
def setUp(self):
self.dagbag = DagBag()
def test_task_count(self):
import unittest
from airflow.models import DagBag
class TestDagIntegrity(unittest.TestCase):
LOAD_SECOND_THRESHOLD = 2
def setUp(self):
self.dagbag = DagBag()
@chandulal
chandulal / hello_world.py
Created August 9, 2018 10:18
Hello World DAG
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators import MultiplyBy5Operator
def print_hello():
return 'Hello Wolrd'
dag = DAG('hello_world', description='Hello world example', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False)