Skip to content

Instantly share code, notes, and snippets.

@PreedhiVivek
Last active March 18, 2024 16:35
Show Gist options
  • Save PreedhiVivek/3ae5963dc23f906c741bfc0ce65c85cf to your computer and use it in GitHub Desktop.
Save PreedhiVivek/3ae5963dc23f906c741bfc0ce65c85cf to your computer and use it in GitHub Desktop.
A DAG to notify nearing GitHub access token expiry on Skype
"""
DAG to notify nearing GitHub access token expiry on Skype
"""
import logging
from datetime import datetime, timedelta
import requests
from airflow.decorators import dag, task
from airflow.models import Variable
skype_channel = Variable.get("SKYPE_CHANNEL")
skype_api_key = Variable.get("SKYPE_API_KEY")
skype_url = Variable.get("SKYPE_URL")
skype_message = Variable.get("SKYPE_MESSAGE")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 2, 7, 10, 30),
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
@dag('notify_token_expiry_dag', default_args=default_args,\
schedule_interval=timedelta(days=90), catchup=False)
def notify_token_expiry_dag():
"""
DAG definition
"""
@task
def post_message_on_skype(message):
"""
Posts a message on the set Skype channel
:param message: text
"""
try:
request_status = "failure"
headers = {"Content-Type": "application/json"}
payload = {
"msg": message,
"channel": skype_channel,
"API_KEY": skype_api_key
}
response = requests.post(
url= skype_url,
json=payload,
headers=headers,
timeout=60
)
if response.status_code == 200:
logging.info("Successfully sent the Skype message - %s", message)
request_status = "success"
return request_status
except requests.exceptions.RequestException as error:
logging.error("Unable to post message to Skype channel! Request Exception: %s", error)
@task
def final_status(status):
"""
Evaluates notification status and marks DAG run accordingly
"""
if status=="failure":
raise Exception("Skype notification task failed. Failing this DAG run!")
logging.info("Skype notification task succeeded. Marking this DAG run as successful!")
# Task dependencies
notification_status = post_message_on_skype(skype_message)
final_status(notification_status)
# DAG instance initiation
DAG_INSTANCE = notify_token_expiry_dag()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment