Created
January 11, 2024 05:17
-
-
Save PreedhiVivek/9c98b314cd488e818368d6c0d74880a3 to your computer and use it in GitHub Desktop.
A DAG to run GitHub lambdas on conditional dependency every night
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A DAG to run GitHub lambdas on conditional dependency every night | |
""" | |
from airflow import DAG | |
from airflow.decorators import dag, task | |
from airflow.models import Variable | |
from datetime import datetime, timedelta | |
import boto3 | |
import json | |
import logging | |
from botocore.config import Config | |
from botocore.exceptions import BotoCoreError, ClientError | |
stats_lambda = Variable.get("GITHUB_STATS_LAMBDA") | |
deltas_lambda = Variable.get("GITHUB_DELTAS_LAMBDA") | |
scores_lambda = Variable.get("GITHUB_SCORES_LAMBDA") | |
profile_name = Variable.get("QELO_PROFILE_NAME") | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'start_date': datetime(2023, 12, 1), | |
'retries': 2, | |
'retry_delay': timedelta(minutes=3), | |
} | |
@dag('github_lambdas_sequence_dag', default_args=default_args, schedule_interval='0 23 * * *', catchup=False) | |
def github_lambdas_sequence_dag(): | |
def init_lambda_client(): | |
"Initializes and returns a lambda client object" | |
try: | |
boto3_config = Config(read_timeout = 120) | |
session = boto3.Session(profile_name=profile_name) | |
client = session.client('lambda', config = boto3_config) | |
logging.info('Client object created!') | |
return client | |
except ClientError as error: | |
logging.error(f"Client error while initializing lambda client: {error}") | |
raise Exception('Exception encountered and run aborted!') from error | |
except BotoCoreError as error: | |
logging.error(f"BotoCore error while initializing lambda client: {error}") | |
raise Exception('Exception encountered and run aborted!') from error | |
except Exception as error: | |
logging.error(f"An error occurred while initializing lambda client: {error}") | |
raise Exception('Exception encountered and run aborted!') from error | |
@task() | |
def invoke_stats_lambda(): | |
"Invokes the GitHub stats lambda and returns the execution status" | |
try: | |
client = init_lambda_client() | |
logging.info("About to invoke stats lambda!") | |
response = client.invoke( | |
FunctionName=stats_lambda, | |
InvocationType='RequestResponse' | |
) | |
client.close() | |
response_data = json.loads(response['Payload'].read().decode('utf-8')) | |
response_body = json.loads(response_data.get("body")) | |
return response_body.get("stats_status_flag") | |
except ClientError as error: | |
# Handle client errors (e.g., 400 series HTTP status codes) | |
logging.error(f"Client error while invoking lambdas: {error}") | |
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error | |
except BotoCoreError as error: | |
# Handle other BotoCore errors | |
logging.error(f"BotoCore error while invoking lambdas: {error}") | |
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error | |
except Exception as error: | |
# Catch any other exceptions | |
logging.error(f"An error occurred while invoking lambdas: {error}") | |
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error | |
@task() | |
def invoke_deltas_lambda(status_flag): | |
"Invokes the GitHub deltas lambda and returns the execution status" | |
try: | |
if status_flag == 1: | |
client = init_lambda_client() | |
logging.info("About to invoke deltas lambda!") | |
response = client.invoke( | |
FunctionName=deltas_lambda, | |
InvocationType='RequestResponse' | |
) | |
client.close() | |
response_data = json.loads(response['Payload'].read().decode('utf-8')) | |
response_body = json.loads(response_data.get("body")) | |
return response_body.get('deltas_status_flag') | |
else: | |
logging.info(f'Stats status flag:{status_flag}') | |
logging.info('Deltas lambda not run!') | |
return None | |
except ClientError as error: | |
# Handle client errors (e.g., 400 series HTTP status codes) | |
logging.error(f"Client error while invoking lambdas: {error}") | |
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error | |
except BotoCoreError as error: | |
# Handle other BotoCore errors | |
logging.error(f"BotoCore error while invoking lambdas: {error}") | |
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error | |
except Exception as error: | |
# Catch any other exceptions | |
logging.error(f"An error occurred while invoking lambdas: {error}") | |
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error | |
@task() | |
def invoke_scores_lambda(status_flag): | |
"Invokes the GitHub scores lambda and returns the execution status" | |
try: | |
if status_flag == 1: | |
client = init_lambda_client() | |
logging.info("About to invoke scores lambda!") | |
response = client.invoke( | |
FunctionName=scores_lambda, | |
InvocationType='RequestResponse' | |
) | |
client.close() | |
response_data = json.loads(response['Payload'].read().decode('utf-8')) | |
response_body = json.loads(response_data.get("body")) | |
return response_body.get('scores_status_flag') | |
else: | |
logging.info(f'Deltas status flag:{status_flag}') | |
logging.info('Scores lambda not run!') | |
return None | |
except ClientError as error: | |
# Handle client errors (e.g., 400 series HTTP status codes) | |
logging.error(f"Client error while invoking lambdas: {error}") | |
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error | |
except BotoCoreError as error: | |
# Handle other BotoCore errors | |
logging.error(f"BotoCore error while invoking lambdas: {error}") | |
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error | |
except Exception as error: | |
# Catch any other exceptions | |
logging.error(f"An error occurred while invoking lambdas: {error}") | |
raise Exception(f"Exception encountered and run aborted on {datetime.now()}!") from error | |
@task() | |
def verify_lambdas_workflow_status(status_flag): | |
"Verifies the overall workflow run status" | |
if status_flag == 1: | |
logging.info('GitHub lambdas workflow run successful!') | |
else: | |
logging.info('GitHub lambdas workflow run failed!') | |
# Task dependencies | |
stats_status_flag = invoke_stats_lambda() | |
deltas_status_flag = invoke_deltas_lambda(stats_status_flag) | |
scores_status_flag = invoke_scores_lambda(deltas_status_flag) | |
verify_lambdas_workflow_status(scores_status_flag) | |
# DAG instance initiation | |
dag_instance = github_lambdas_sequence_dag() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment