Skip to content

Instantly share code, notes, and snippets.

@SHolzhauer
Last active November 16, 2023 20:26
Show Gist options
  • Save SHolzhauer/60e8b068c218ae6255a29467359ebf9c to your computer and use it in GitHub Desktop.
Save SHolzhauer/60e8b068c218ae6255a29467359ebf9c to your computer and use it in GitHub Desktop.
Code to ingest SNS messages from EC2 image builder into Elasticsearch
{
"description": "Parse AWS SNS events",
"processors": [
{
"set": {
"field": "event.module",
"value": "aws"
}
},
{
"set": {
"field": "event.provider",
"value": "aws.sns"
}
},
{
"set": {
"field": "event.dataset",
"value": "aws.imagebuilder"
}
},
{
"set": {
"field": "event.action",
"value": "create-ami"
}
},
{
"set": {
"field": "event.category",
"value": ["package", "host"]
}
},
{
"set": {
"field": "event.type",
"value": ["change", "installation", "creation"]
}
},
{
"set": {
"field": "event.outcome",
"value": "unknown"
}
},
{
"set": {
"if": "ctx.aws?.sns?.state?.status == 'AVAILABLE'",
"field": "event.outcome",
"value": "success"
}
},
{
"set": {
"if": "ctx.aws?.sns?.state?.status == 'FAILED'",
"field": "event.outcome",
"value": "failure"
}
},
{
"set": {
"field": "cloud.account.id",
"copy_from": "aws.sns.accountId"
}
},
{
"set": {
"field": "event.reason",
"copy_from": "aws.sns.buildType"
}
},
{
"set": {
"field": "host.os.platform",
"copy_from": "aws.sns.platform"
}
},
{
"set": {
"field": "host.os.name",
"copy_from": "aws.sns.osVersion",
"ignore_failure": true
}
},
{
"set": {
"field": "cloud.image.id",
"copy_from": "aws.sns.outputResources.amis.0.image",
"ignore_failure": true
}
},
{
"reroute": {
"dataset": "aws.generic",
"namespace": "default"
}
}
]
}
import requests
import boto3
from botocore.exceptions import ClientError
import json
def get_credentials():
session = boto3.session.Session()
sm = session.client(
'secretsmanager',
region_name='us-east-1'
)
try:
get_secret_value_response = sm.get_secret_value(
SecretId="demo_ami_builder"
)
except ClientError as e:
raise e
secret = get_secret_value_response['SecretString']
try:
secret = json.loads(secret)
except Exception:
pass
return secret
credentials = get_credentials()
try:
es_endpoint = credentials["endpoint"]
username = credentials["username"]
password = credentials["password"]
except Exception:
print(credentials)
exit()
def ingest_message(message):
build_msg={
"aws": {
"sns": message
}
}
resp = requests.post(
url=f"{es_endpoint}/logs-placeholder_datastream-default/_doc?pipeline=ingest_sns",
auth=(f"{username}", f"{password}"),
json=build_msg
)
print(resp.json())
def lambda_handler(event, context):
message = json.loads(event['Records'][0]['Sns']['Message'])
try:
if message["arn"][:21] == "arn:aws:imagebuilder:":
ingest_message(message)
except TypeError:
print(message)
except KeyError:
print(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment