- Airflow latest stable verison installed (preferable to use Airflow in docker).
- Minimum 4 GB ram to be allocated for the docker
- Create an Airflow connection for Slack with HTTP connection and the part after https://hooks.slack.com/services should go under password: Refer the diagram below and extract the API token (Shaded portion) from your slack app and key in appropriately
Host: https://hooks.slack.com/services
Conn Type: HTTP
Password: /T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
Refer this blog article for details
- Keep this file under
/dags
folder - Make sure the slack_conn
import json
import os
from datetime import datetime, timedelta
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.contrib.operators.slack_webhook_operator import \
SlackWebhookOperator
from sqlalchemy import create_engine
import pandas as pd
from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
data_path = f'{json.loads(BaseHook.get_connection("data_path").get_extra()).get("path")}/data.csv'
transformed_path = f'{os.path.splitext(data_path)[0]}-transformed.csv'
default_args = {
"owner": "airflow",
"start_date": datetime(2021, 9 , 13),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": True,
"email": "senthilsweb@gmail.com",
"retries": 1,
"max_active_runs":1,
}
LoggingMixin().log.info("Senthilnathan")
def hello_world():
LoggingMixin().log.info("Senthilnathan")
LoggingMixin().log.info(data_path)
LoggingMixin().log.info(transformed_path)
LoggingMixin().log.info(os.environ['AIRFLOW_HOME'])
return 'Hello World'
with DAG(dag_id="visby_dag",
schedule_interval="@daily",
default_args=default_args,
catchup=False) as dag:
start = DummyOperator(task_id='start',dag=dag)
start.doc_md="""#Start Operator"""
hello = PythonOperator(task_id='hello',python_callable=hello_world,dag=dag)
slack_token = BaseHook.get_connection("slack_conn").password
notify_data_science_team = SlackWebhookOperator(
task_id='notify_data_science_team',
http_conn_id='slack_conn',
webhook_token=slack_token,
message="Data Science Notification \n"
"New Invoice Data is loaded into invoices table. \n "
"Here is a celebration kitty: "
"https://www.youtube.com/watch?v=J---aiyznGQ",
username='airflow',
icon_url='https://raw.githubusercontent.com/apache/'
'airflow/master/airflow/www/static/pin_100.png',
dag=dag
)
dc = DockerOperator(
dag=dag,
task_id='docker_task',
image='docker_job_golang',
container_name='peaceful_cannon',
api_version='auto',
auto_remove=True,
command="echo hello >> ec.txt",
docker_url="unix://var/run/docker.sock",
network_mode="bridge"
)
ssh = SSHOperator(
dag=dag,
ssh_conn_id='ssh_conn',
task_id='ssh_operator',
command='echo "senthil" >> senthil.txt',
)
# Now could come an upload to S3 of the model or a deploy step
start >> hello >> notify_data_science_team >> ssh