Skip to content

Instantly share code, notes, and snippets.

View aawgit's full-sized avatar

Akalanka Weerasooriya aawgit

View GitHub Profile
from main_module import main_func
def test_main_func_mocked(monkeypatch):
monkeypatch.setattr('main_module.service_func', lambda : 'mocked result')
assert main_func() == 'mocked result'
from main_module import main_func, main_func_2
def test_main_func(monkeypatch):
monkeypatch.setattr('sub_module.service_func', lambda : 'mocked result')
assert main_func() == 'mocked result'
import logging
def service_func():
logging.info('From sub modules function')
return 'sub mod result'
from sub_module import service_func
import logging
def main_func():
logging.info('Calling sub modules function')
result = service_func()
return result
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def error_function():
raise Exception('Something wrong')
t2 = PythonOperator(
task_id='failing_task',
python_callable=error_function,
email_on_failure=True,
@aawgit
aawgit / airflow-mail-operator.py
Last active November 16, 2019 17:36
Sending e mail notifications with Apache Airflow
from airflow import DAG
from airflow.operators.email_operator import EmailOperator
t1 = EmailOperator(
task_id="send_mail",
to='receiver@mail.com',
subject='Test mail',
html_content='<p> You have got mail! <p>',
dag=dag)
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello, World!'