Skip to content

Instantly share code, notes, and snippets.

@ajorg

ajorg/sns_webhook.py

Last active Feb 25, 2021
Embed
What would you like to do?
# Copyright Andrew Jorgensen
# SPDX-License-Identifier: MIT
"""Receive SNS events in Lambda and POST to a JSON Webhook.
Environment variables required:
* URL - The Webhook URL to POST to (including any required keys)
* TEMPLATE (default: {}) - The JSON data template to POST to the Webhook
* MESSAGE_KEY (default: text) - Key to set to the SNS Message
* TOPIC_KEY (optional) - Key to set to the Topic name from the SNS event
"""
import json
from os import environ
from urllib.request import urlopen, Request
CONTENT_TYPE = "application/json; charset=utf-8"
def lambda_handler(event, context):
"""Lambda handler - expects an SNS event"""
user_agent = context.function_name
print(json.dumps(dict(environ), sort_keys=True))
url = environ.get("URL")
template = environ.get("TEMPLATE", "{}")
message_key = environ.get("MESSAGE_KEY", "text")
topic_key = environ.get("TOPIC_KEY")
print(json.dumps(event, sort_keys=True))
topic = event["Records"][0]["Sns"]["TopicArn"].rsplit(":", 1)[1]
subject = event["Records"][0]["Sns"]["Subject"]
message = event["Records"][0]["Sns"]["Message"]
data = json.loads(template)
if topic_key:
data[topic_key] = topic
if subject:
data[message_key] = f"{subject}: {message}"
else:
data[message_key] = message
data = json.dumps(data, sort_keys=True)
print(data)
request = Request(
url=url,
data=data.encode("utf-8"),
headers={"User-Agent": user_agent, "Content-Type": CONTENT_TYPE},
)
with urlopen(request) as response:
print(response.read().decode("utf-8"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment