Skip to content

Instantly share code, notes, and snippets.

@chandulal
Created August 9, 2018 11:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chandulal/cd6399a780d6077588bdaca2ec17d256 to your computer and use it in GitHub Desktop.
Save chandulal/cd6399a780d6077588bdaca2ec17d256 to your computer and use it in GitHub Desktop.
import unittest
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import HelloworldSensor
class TestHelloworldSensor(unittest.TestCase):
def test_poke_should_return_false_when_value_of_minute_is_not_divisible_by_3(self):
dag = DAG(dag_id='anydag', start_date=datetime.now())
sensor_task = HelloworldSensor(
task_id='any_sensor_task',
poke_interval=2,
params={'sensor_start_time': datetime(2018, 8, 8, 10, 50)},
dag=dag
)
sti = TaskInstance(task=sensor_task, execution_date=datetime.now())
result = sensor_task.poke(sti.get_template_context())
self.assertFalse(result)
def test_poke_should_return_true_when_value_of_minute_is_divisible_by_3(self):
dag = DAG(dag_id='anydag', start_date=datetime.now())
sensor_task = HelloworldSensor(
task_id='any_sensor_task',
poke_interval=2,
params={'sensor_start_time': datetime(2018, 8, 8, 10, 9)},
dag=dag
)
sti = TaskInstance(task=sensor_task, execution_date=datetime.now())
result = sensor_task.poke(sti.get_template_context())
self.assertTrue(result)
def test_execute_should_return_true(self):
dag = DAG(dag_id='anydag', start_date=datetime.now())
sensor_task = HelloworldSensor(
task_id='any_sensor_task',
poke_interval=2,
params={'sensor_start_time': datetime(2018, 8, 8, 10, 10)},
dag=dag
)
sti = TaskInstance(task=sensor_task, execution_date=datetime.now())
sensor_time = sensor_task.execute(sti.get_template_context())
self.assertEqual(sensor_time,12)
suite = unittest.TestLoader().loadTestsFromTestCase(TestHelloworldSensor)
unittest.TextTestRunner(verbosity=2).run(suite)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment