Skip to content

Instantly share code, notes, and snippets.

@nelabhotlaR
Last active August 4, 2023 06:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nelabhotlaR/10fcaa3e6f9b7dece68087f58538e060 to your computer and use it in GitHub Desktop.
Save nelabhotlaR/10fcaa3e6f9b7dece68087f58538e060 to your computer and use it in GitHub Desktop.
aws_cred_connection.py - To establish AWS connection. If exists use existing otherwise new connection.
"""
AWS connection for non-default profile.
"""
import json
from airflow.models import Connection
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow import settings
def create_or_update_aws_connection():
"""
This code is for connecting AWS using credentials for non-default profile. Checks for
existing connection if not available will create new connection.
"""
# provide the name of the non-default AWS profile
aws_profile = "personal"
# Create a new AWS connection or update the existing one
aws_conn_id = "my_aws_connection" # Choose a unique connection ID
aws_region = "us-east-1" # Replace with your desired AWS region
session = settings.Session()
existing_connection = session.query(Connection).filter(Connection.conn_id == aws_conn_id).first()
if existing_connection:
# If the connection already exists
existing_connection.extra = json.dumps({"region_name": aws_region})
session.commit()
print(f"Updated existing AWS connection: {aws_conn_id}")
else:
# If the connection does not exist, create a new one
aws_hook = AwsBaseHook(aws_conn_id=aws_profile)
aws_credentials = aws_hook.get_credentials()
if aws_credentials:
new_connection = Connection(
conn_id=aws_conn_id,
conn_type="aws",
login=aws_credentials.access_key,
password=aws_credentials.secret_key,
extra=json.dumps({"region_name": aws_region}),
)
session.add(new_connection)
session.commit()
print(f"Created new AWS connection: {aws_conn_id}")
else:
print(f"Failed to get AWS credentials for profile: {aws_profile}")
session.close()
if __name__ == "__main__":
create_or_update_aws_connection()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment