Skip to content

Instantly share code, notes, and snippets.

View chandulal's full-sized avatar

chandu kavar chandulal

View GitHub Profile
def push_to_xcoms(*args, **kwargs):
value = "dummyValue"
kwargs['ti'].xcom_push(key="dummyKey", value=value)
push_to_xcoms_task = PythonOperator(
task_id='push_to_xcoms',
provide_context=True,
python_callable=push_to_xcoms,
dag=dag
)
def push_to_xcoms(*args, **kwargs):
value = "dummyValue"
kwargs['ti'].xcom_push(key="dummyKey", value=value)
def pull_from_xcoms(**kwargs):
ti = kwargs['ti']
pulled_value = ti.xcom_pull(key='dummyKey', task_ids='push_to_xcoms')
print("value=" + str(pulled_value))
push_to_xcoms_task = PythonOperator(
self.airflow_api.add_presto_connection("presto-conn",presto_catlog ,presto_schema)
self.airflow_api.add_mysql_connection("mysql-conn", mysql_database, mysql_user, mysql_password)
import sys
import unittest
import mysql.connector
import prestodb
sys.path.append('../')
from airflow_api import AirflowAPI
from db_util import DBUtil
from constants import PRESTO_DB_PORT,MYSQL_DB_PORT
from airflow.operators.presto_to_mysql import PrestoToMySqlTransfer
from datetime import datetime
from airflow import DAG
default_args = {
'email': ['hello@world.com']
}
dag = DAG('presto_to_mysql', description='Presto to Mysql Transfer', default_args=default_args,
schedule_interval='0 12 * * *',
import sys
import unittest
sys.path.append('../')
from airflow_api import AirflowAPI
class TestHelloWorldDag(unittest.TestCase):
"""Integration test for Hello world DAG"""
def test_xcom_in_templated_field(self):
dag_id = 'hello_world_xcoms'
dag = self.dagbag.get_dag(dag_id)
push_to_xcoms_task = dag.get_task('push_to_xcoms')
execution_date = datetime.now()
push_to_xcoms_ti = TaskInstance(task=push_to_xcoms_task, execution_date=execution_date)
context = push_to_xcoms_ti.get_template_context()
push_to_xcoms_task.execute(context)
def test_xcoms(self):
dag_id = 'hello_world_xcoms'
dag = self.dagbag.get_dag(dag_id)
push_to_xcoms_task = dag.get_task('push_to_xcoms')
pull_from_xcoms_task = dag.get_task('pull_from_xcoms')
execution_date = datetime.now()
push_to_xcoms_ti = TaskInstance(task=push_to_xcoms_task, execution_date=execution_date)
context = push_to_xcoms_ti.get_template_context()
@chandulal
chandulal / helloworld_xcoms.py
Created September 8, 2019 06:31
Hello World DAG with Xcoms
import datetime
from airflow import DAG
from airflow.operators import PythonOperator
from airflow.operators import BashOperator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
import unittest
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance
from airflow.operators import HelloworldSensor
class TestHelloworldSensor(unittest.TestCase):
def test_poke_should_return_false_when_value_of_minute_is_not_divisible_by_3(self):
dag = DAG(dag_id='anydag', start_date=datetime.now())
sensor_task = HelloworldSensor(