Skip to content

Instantly share code, notes, and snippets.

@PreedhiVivek
Created January 11, 2024 05:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PreedhiVivek/9c98b314cd488e818368d6c0d74880a3 to your computer and use it in GitHub Desktop.
Save PreedhiVivek/9c98b314cd488e818368d6c0d74880a3 to your computer and use it in GitHub Desktop.
A DAG to run GitHub lambdas on conditional dependency every night
"""
A DAG to run GitHub lambdas on conditional dependency every night
"""
from airflow import DAG
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime, timedelta
import boto3
import json
import logging
from botocore.config import Config
from botocore.exceptions import BotoCoreError, ClientError
stats_lambda = Variable.get("GITHUB_STATS_LAMBDA")
deltas_lambda = Variable.get("GITHUB_DELTAS_LAMBDA")
scores_lambda = Variable.get("GITHUB_SCORES_LAMBDA")
profile_name = Variable.get("QELO_PROFILE_NAME")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 12, 1),
'retries': 2,
'retry_delay': timedelta(minutes=3),
}
@dag('github_lambdas_sequence_dag', default_args=default_args, schedule_interval='0 23 * * *', catchup=False)
def github_lambdas_sequence_dag():
def init_lambda_client():
"Initializes and returns a lambda client object"
try:
boto3_config = Config(read_timeout = 120)
session = boto3.Session(profile_name=profile_name)
client = session.client('lambda', config = boto3_config)
logging.info('Client object created!')
return client
except ClientError as error:
logging.error(f"Client error while initializing lambda client: {error}")
raise Exception('Exception encountered and run aborted!') from error
except BotoCoreError as error:
logging.error(f"BotoCore error while initializing lambda client: {error}")
raise Exception('Exception encountered and run aborted!') from error
except Exception as error:
logging.error(f"An error occurred while initializing lambda client: {error}")
raise Exception('Exception encountered and run aborted!') from error
@task()
def invoke_stats_lambda():
"Invokes the GitHub stats lambda and returns the execution status"
try:
client = init_lambda_client()
logging.info("About to invoke stats lambda!")
response = client.invoke(
FunctionName=stats_lambda,
InvocationType='RequestResponse'
)
client.close()
response_data = json.loads(response['Payload'].read().decode('utf-8'))
response_body = json.loads(response_data.get("body"))
return response_body.get("stats_status_flag")
except ClientError as error:
# Handle client errors (e.g., 400 series HTTP status codes)
logging.error(f"Client error while invoking lambdas: {error}")
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error
except BotoCoreError as error:
# Handle other BotoCore errors
logging.error(f"BotoCore error while invoking lambdas: {error}")
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error
except Exception as error:
# Catch any other exceptions
logging.error(f"An error occurred while invoking lambdas: {error}")
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error
@task()
def invoke_deltas_lambda(status_flag):
"Invokes the GitHub deltas lambda and returns the execution status"
try:
if status_flag == 1:
client = init_lambda_client()
logging.info("About to invoke deltas lambda!")
response = client.invoke(
FunctionName=deltas_lambda,
InvocationType='RequestResponse'
)
client.close()
response_data = json.loads(response['Payload'].read().decode('utf-8'))
response_body = json.loads(response_data.get("body"))
return response_body.get('deltas_status_flag')
else:
logging.info(f'Stats status flag:{status_flag}')
logging.info('Deltas lambda not run!')
return None
except ClientError as error:
# Handle client errors (e.g., 400 series HTTP status codes)
logging.error(f"Client error while invoking lambdas: {error}")
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error
except BotoCoreError as error:
# Handle other BotoCore errors
logging.error(f"BotoCore error while invoking lambdas: {error}")
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error
except Exception as error:
# Catch any other exceptions
logging.error(f"An error occurred while invoking lambdas: {error}")
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error
@task()
def invoke_scores_lambda(status_flag):
"Invokes the GitHub scores lambda and returns the execution status"
try:
if status_flag == 1:
client = init_lambda_client()
logging.info("About to invoke scores lambda!")
response = client.invoke(
FunctionName=scores_lambda,
InvocationType='RequestResponse'
)
client.close()
response_data = json.loads(response['Payload'].read().decode('utf-8'))
response_body = json.loads(response_data.get("body"))
return response_body.get('scores_status_flag')
else:
logging.info(f'Deltas status flag:{status_flag}')
logging.info('Scores lambda not run!')
return None
except ClientError as error:
# Handle client errors (e.g., 400 series HTTP status codes)
logging.error(f"Client error while invoking lambdas: {error}")
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error
except BotoCoreError as error:
# Handle other BotoCore errors
logging.error(f"BotoCore error while invoking lambdas: {error}")
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error
except Exception as error:
# Catch any other exceptions
logging.error(f"An error occurred while invoking lambdas: {error}")
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error
@task()
def verify_lambdas_workflow_status(status_flag):
"Verifies the overall workflow run status"
if status_flag == 1:
logging.info('GitHub lambdas workflow run successful!')
else:
logging.info('GitHub lambdas workflow run failed!')
# Task dependencies
stats_status_flag = invoke_stats_lambda()
deltas_status_flag = invoke_deltas_lambda(stats_status_flag)
scores_status_flag = invoke_scores_lambda(deltas_status_flag)
verify_lambdas_workflow_status(scores_status_flag)
# DAG instance initiation
dag_instance = github_lambdas_sequence_dag()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment