Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import boto3
import uuid
import json
import random
import logging
from datetime import datetime
logging.basicConfig(format="[%(levelname)s] [%(name)s] [%(asctime)s]: %(message)s",
level="INFO")
logger = logging.getLogger(__name__)
sns = boto3.client('sns', region_name='eu-central-1')
# CREATE TOPIC
topic_name = "example_sns_topic"
create_response = sns.create_topic(Name=topic_name)
topic_arn = create_response.get("TopicArn")
logger.info("Create topic response: %s", create_response)
# CREATE SUBSCRIPTIONS
email_sub = sns.subscribe(TopicArn=topic_arn,
Protocol='email',
Endpoint="your_email")
phone_sub = sns.subscribe(TopicArn=topic_arn,
Protocol='sms',
Endpoint="your_phone_number")
sqs_sub = sns.subscribe(TopicArn=topic_arn,
Protocol='sqs',
Endpoint="your_sqs_queue_arn")
# SEND MESSAGES
sns.publish(TopicArn=topic_arn,
Message=json.dumps(dict(item=random.randint(1, 100000),
value=random.random(),
sent_timestamp=datetime.utcnow().isoformat(),
id=str(uuid.uuid4()))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment