Skip to content

Instantly share code, notes, and snippets.

@revmischa
Last active November 20, 2018 10:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save revmischa/458ba6c65c7662f0c5eff8feaf7d6c53 to your computer and use it in GitHub Desktop.
Save revmischa/458ba6c65c7662f0c5eff8feaf7d6c53 to your computer and use it in GitHub Desktop.
Heroku logplex syslog handler lambda
"""Sample handler for parsing Heroku logplex drain events (https://devcenter.heroku.com/articles/log-drains#https-drains).
Expects messages to be framed with the syslog TCP octet counting method (https://tools.ietf.org/html/rfc6587#section-3.4.1).
This is designed to be run as a Python3.6 lambda.
"""
import json
import boto3
import logging
import iso8601
import requests
from base64 import b64decode
from pyparsing import Word, Suppress, nums, Optional, Regex, pyparsing_common, alphanums
from syslog import LOG_DEBUG, LOG_WARNING, LOG_INFO, LOG_NOTICE
from collections import defaultdict
HOOK_URL = "https://" + boto3.client('kms').decrypt(CiphertextBlob=b64decode(ENCRYPTED_HOOK_URL))['Plaintext'].decode('ascii')
CHANNEL = "#alerts"
log = logging.getLogger('myapp.heroku.drain')
class Parser(object):
def __init__(self):
ints = Word(nums)
# priority
priority = Suppress("<") + ints + Suppress(">")
# version
version = ints
# timestamp
timestamp = pyparsing_common.iso8601_datetime
# hostname
hostname = Word(alphanums + "_" + "-" + ".")
# source
source = Word(alphanums + "_" + "-" + ".")
# appname
appname = Word(alphanums + "(" + ")" + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress("-")
# message
message = Regex(".*")
# pattern build
self.__pattern = priority + version + timestamp + hostname + source + appname + message
def parse(self, line):
parsed = self.__pattern.parseString(line)
# https://tools.ietf.org/html/rfc5424#section-6
# get priority/severity
priority = int(parsed[0])
severity = priority & 0x07
facility = priority >> 3
payload = {}
payload["priority"] = priority
payload["severity"] = severity
payload["facility"] = facility
payload["version"] = parsed[1]
payload["timestamp"] = iso8601.parse_date(parsed[2])
payload["hostname"] = parsed[3]
payload["source"] = parsed[4]
payload["appname"] = parsed[5]
payload["message"] = parsed[6]
return payload
parser = Parser()
def lambda_handler(event, context):
handle_lambda_proxy_event(event)
return {
"isBase64Encoded": False,
"statusCode": 200,
"headers": {"Content-Length": 0},
}
def handle_lambda_proxy_event(event):
body = event['body']
headers = event['headers']
# sanity-check source
assert headers['X-Forwarded-Proto'] == 'https'
assert headers['Content-Type'] == 'application/logplex-1'
# split into chunks
def get_chunk(payload: bytes):
# payload = payload.lstrip()
msg_len, syslog_msg_payload = payload.split(b' ', maxsplit=1)
if msg_len == '':
raise Exception(f"failed to parse heroku logplex payload: '{payload}'")
try:
msg_len = int(msg_len)
except Exception as ex:
raise Exception(f"failed to parse {msg_len} as int, payload: {payload}") from ex
# only grab msg_len bytes of syslog_msg
syslog_msg = syslog_msg_payload[0:msg_len]
next_payload = syslog_msg_payload[msg_len:]
yield syslog_msg.decode('utf-8')
if next_payload:
yield from get_chunk(next_payload)
# group messages by source,app
# format for slack
srcapp_msgs = defaultdict(dict)
chunk_count = 0
for chunk in get_chunk(bytes(body, 'utf-8')):
chunk_count += 1
evt = parser.parse(chunk)
if not filter_slack_msg(evt):
# skip stuff filtered out
continue
# add to group
sev = evt['severity']
group_name = f"SEV:{sev} {evt['source']} {evt['appname']}"
if sev not in srcapp_msgs[group_name]:
srcapp_msgs[group_name][sev] = list()
body = evt["message"]
srcapp_msgs[group_name][sev].append(str(evt["timestamp"]) + ': ' + evt["message"])
for group_name, sevs in srcapp_msgs.items():
for severity, lines in sevs.items():
if not lines:
continue
title = group_name
# format the syslog event as a slack message attachment
slack_att = slack_format_attachment(log_msg=None, log_rec=evt)
text = "\n" + "\n".join(lines)
slack(text=text, title=title, attachments=[slack_att], channel=channel, severity=severity)
# sanity-check number of parsed messages
assert int(headers['Logplex-Msg-Count']) == chunk_count
return ""
def slack_format_attachment(log_msg=None, log_rec=None, title=None):
"""Format as slack attachment."""
severity = int(log_rec['severity'])
# color
color = None
if severity == LOG_DEBUG:
color = "#aaaaaa"
elif severity == LOG_INFO:
color = "good"
elif severity == LOG_NOTICE:
color = "#439FE0"
elif severity == LOG_WARNING:
color = "warning"
elif severity < LOG_WARNING:
# error!
color = "danger"
attachment = {
# 'text': "`" + log_msg + "`",
# 'parse': 'none',
'author_name': title,
'color': color,
'mrkdwn_in': ['text'],
'text': log_msg,
# 'fields': [
# # {
# # 'title': "Facility",
# # 'value': log_rec["facility"],
# # 'short': True,
# # },
# # {
# # 'title': "Severity",
# # 'value': severity,
# # 'short': True,
# # },
# {
# 'title': "App",
# 'value': log_rec["appname"],
# 'short': True,
# },
# # {
# # 'title': "Source",
# # 'value': log_rec["source"],
# # 'short': True,
# # },
# {
# 'title': "Timestamp",
# 'value': str(log_rec["timestamp"]),
# 'short': True,
# }
# ]
}
return attachment
def filter_slack_msg(msg):
"""Return true if we should send to slack."""
sev = msg["severity"] # e.g. LOG_DEBUG
source = msg["source"] # e.g. 'app'
appname = msg["appname"] # e.g. 'heroku-postgres'
body = msg["message"]
if sev >= LOG_DEBUG:
return False
if body.startswith('DEBUG '):
return False
# if source == 'app' and sev > LOG_WARNING:
# return False
if appname == 'router':
return False
if appname == 'heroku-postgres' and sev >= LOG_INFO:
return False
if 'sql_error_code = 00000 LOG: checkpoint complete' in body:
# ignore checkpoint
return False
if 'sql_error_code = 00000 NOTICE: pg_stop_backup complete, all required WAL segments have been archived' in body:
# ignore checkpoint
return False
if 'sql_error_code = 00000 LOG: checkpoint starting: ' in body:
# ignore checkpoint
return False
if appname == 'logplex' and body.startswith('Error L10'):
# NN messages dropped since...
return False
return True
def slack(text=None, title=None, attachments=[], icon=None, channel='#alerts', severity=LOG_WARNING):
if not attachments:
return
# emoji icon
icon = 'mega'
if severity == LOG_DEBUG:
icon = 'information_source'
elif severity == LOG_INFO:
icon = 'information_desk_person'
elif severity == LOG_NOTICE:
icon = 'scroll'
elif severity == LOG_WARNING:
icon = 'warning'
elif severity < LOG_WARNING:
# error!
icon = 'boom'
message = {
"username": title,
"channel": channel,
"icon_emoji": f":{icon}:",
"attachments": attachments,
"text": text,
}
print(message)
slack_raw(message)
def slack_raw(payload):
response = requests.post(
HOOK_URL, data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment