-
-
Save nelabhotlaR/10fcaa3e6f9b7dece68087f58538e060 to your computer and use it in GitHub Desktop.
aws_cred_connection.py - To establish AWS connection. If exists use existing otherwise new connection.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
AWS connection for non-default profile. | |
""" | |
import json | |
from airflow.models import Connection | |
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook | |
from airflow import settings | |
def create_or_update_aws_connection(): | |
""" | |
This code is for connecting AWS using credentials for non-default profile. Checks for | |
existing connection if not available will create new connection. | |
""" | |
# provide the name of the non-default AWS profile | |
aws_profile = "personal" | |
# Create a new AWS connection or update the existing one | |
aws_conn_id = "my_aws_connection" # Choose a unique connection ID | |
aws_region = "us-east-1" # Replace with your desired AWS region | |
session = settings.Session() | |
existing_connection = session.query(Connection).filter(Connection.conn_id == aws_conn_id).first() | |
if existing_connection: | |
# If the connection already exists | |
existing_connection.extra = json.dumps({"region_name": aws_region}) | |
session.commit() | |
print(f"Updated existing AWS connection: {aws_conn_id}") | |
else: | |
# If the connection does not exist, create a new one | |
aws_hook = AwsBaseHook(aws_conn_id=aws_profile) | |
aws_credentials = aws_hook.get_credentials() | |
if aws_credentials: | |
new_connection = Connection( | |
conn_id=aws_conn_id, | |
conn_type="aws", | |
login=aws_credentials.access_key, | |
password=aws_credentials.secret_key, | |
extra=json.dumps({"region_name": aws_region}), | |
) | |
session.add(new_connection) | |
session.commit() | |
print(f"Created new AWS connection: {aws_conn_id}") | |
else: | |
print(f"Failed to get AWS credentials for profile: {aws_profile}") | |
session.close() | |
if __name__ == "__main__": | |
create_or_update_aws_connection() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment