Skip to content

Instantly share code, notes, and snippets.

@ixjosemi
Created June 6, 2019 10:36
Show Gist options
  • Save ixjosemi/d58d7f178b6e17a84fcd447b3d57da31 to your computer and use it in GitHub Desktop.
Save ixjosemi/d58d7f178b6e17a84fcd447b3d57da31 to your computer and use it in GitHub Desktop.
custom-sns-integration
#!/usr/bin/env python
# Copyright (C) 2015-2019, Wazuh Inc.
# Created by Wazuh, Inc. <info@wazuh.com>.
# This program is a free software; you can redistribute it and/or modify it under the terms of GPLv2
import boto3
import sys
import socket
import json
import logging
if sys.version_info[0] == 2:
import ConfigParser as configparser
else:
import configparser
def send_alert_to_sns(region, arn, message):
# Connect to sns
sns = boto3.client('sns', region_name=region)
# Send the message to the topic
sns.publish(
TopicArn=arn,
Message=message
)
def get_installation_path():
configuration = {}
with open('/etc/ossec-init.conf', 'r') as f:
for l in f:
key, val = l.strip().split("=")
if key == 'DIRECTORY':
configuration[key]= val
installation_path = configuration['DIRECTORY'].replace('"', '')
return installation_path
def main():
# Parse args
try:
alerts_file = sys.argv[1]
except Exception as e:
logging.error("Wrong arguments: '{}".format(e))
exit(1)
# Read alerts file
try:
with open(alerts_file, 'r') as alert_f:
alert_json = json.loads(alert_f.read())
except Exception as e:
logging.error("Cannot read alerts file: '{}'".format(e))
exit(1)
# Read sns.config file
try:
config = configparser.ConfigParser()
config.read("/var/ossec/etc/sns.conf")
arn = config.get('SNSCONFIG', 'topic_arn')
region = config.get('SNSCONFIG', 'region')
except Exception as e:
logging.error("Cannot read the sns config file: '{}'".format(e))
exit(1)
# Read installation path
try:
installation_path = get_installation_path()
logging_filepath = installation_path + '/logs/sns.log'
except Exception as e:
logging_filepath = '/var/ossec/logs/sns.log'
logging.error("Cannot read the ossec-init config file: '{}'".format(e))
exit(1)
# Logging configuration
try:
hostname = socket.gethostname()
except Exception as e:
logging.error("Cannot solve hostname: '{}'".format(e))
exit(1)
log_format = '%(asctime)s {} %(name)s %(levelname)s: %(message)s'.format(hostname)
logging.basicConfig(filename=logging_filepath, format=log_format, level=logging.INFO)
# Extract issue fields
alert_level = alert_json['rule']['level']
description = alert_json['rule']['description']
rule_id = alert_json['rule']['id']
agent_id = alert_json['agent']['id']
# Simplify parameters
# Description: Shorted to 100 chars due to the max size of an SMS is 160 chars.
# ARN: Removed sensitive information.
description = (description[:100] + '...') if len(description) > 100 else description
arn_name = arn.rsplit(':', 1)[-1]
# Message body creation
message = """
Agent ID: {}
Level: {}
Description: {}
""".format(agent_id, alert_level, description)
# Publish message to topic
try:
logging.info("Sending alert ({}) to SNS topic: '{}'.".format(rule_id, arn_name))
send_alert_to_sns(region, arn, message)
except Exception as e:
logging.error("Cannot send message to the topic: '{}'".format(e))
exit(1)
if __name__ == "__main__":
try:
main()
except Exception as e:
logging.error("Cannot execute main function: '{}".format(e))
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment