Skip to content

Instantly share code, notes, and snippets.

@gbzarelli
Last active July 1, 2022 17:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gbzarelli/fa119e250064372dbe95808c2f5f38b1 to your computer and use it in GitHub Desktop.
Save gbzarelli/fa119e250064372dbe95808c2f5f38b1 to your computer and use it in GitHub Desktop.
SNS Subscriptions Migrate - This script duplicate all subscriptions from one SNS topic to another
import boto3
ACCESS_KEY = "-"
SECRET_KEY = "-"
REGION_FROM = "sa-east-1"
TOPIC_ARN_FROM = "arn:aws:sns:sa-east-1:000000000:teste-zarelli-topic"
REGION_TO = "us-east-1"
TOPIC_ARN_TO = "arn:aws:sns:us-east-1:000000000:teste-zarelli-topic"
def get_topic(resource, topic_arn: str):
topic = list(filter(lambda t: (t.arn == topic_arn), resource.topics.all()))[0]
return topic
if __name__ == '__main__':
session = boto3.Session(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY)
# Load resources
resource_from = session.resource(service_name='sns', region_name=REGION_FROM)
resource_to = session.resource(service_name='sns', region_name=REGION_TO)
# Load topics
topic_from = get_topic(resource_from, TOPIC_ARN_FROM)
topic_to = get_topic(resource_to, TOPIC_ARN_TO)
subscriptions_from = topic_from.subscriptions.all()
# Migrate subs
for sub in subscriptions_from:
attrs = {'RawMessageDelivery': sub.attributes['RawMessageDelivery']}
if sub.attributes.get('FilterPolicy') is not None:
attrs['FilterPolicy'] = sub.attributes['FilterPolicy']
new_subscription = topic_to.subscribe(
Protocol=sub.attributes['Protocol'],
Endpoint=sub.attributes['Endpoint'],
Attributes=attrs,
ReturnSubscriptionArn=True
)
print(f'Subscribe to {new_subscription.attributes["Endpoint"]} - Pending: '
f'{new_subscription.attributes["PendingConfirmation"]}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment