Skip to content

Instantly share code, notes, and snippets.

@codyaray
Last active January 2, 2021 19:21
Show Gist options
  • Save codyaray/2d50cf798f5d3fddd232d446da6552df to your computer and use it in GitHub Desktop.
Save codyaray/2d50cf798f5d3fddd232d446da6552df to your computer and use it in GitHub Desktop.
"""
Listens to incoming Slack messages from cardbucks-bot and stores the metrics in Elasticsearch
"""
from elasticsearch import Elasticsearch
from datetime import datetime
import re
# rtmbot variables
crontable = []
outputs = []
ELASTIC_HOST = '[your-hostname]'
ELASTIC_INDEX = '[your-index]'
ELASTIC_TYPE = '[your-type]'
USER_ID = '[your Slack bot id]'
CHANNEL = '[your Slack channel id]'
METRIC_PATTERNS = {
'user_registration': re.compile('New User Registration')
'user_disabled': re.compile('User Disabled(: )?(?P.*)'),
'user_enabled': re.compile('User Enabled'),
'payment_failed': re.compile('Payment Failed'),
'fingerprint_blacklisted': re.compile('Fingerprint Blacklisted'),
'ip_addr_blacklisted': re.compile('IP Address Blacklisted'),
'purchase_confirmation': re.compile('User purchase\n(?P.*)', re.DOTALL)
# and many more types of events captured...
}
POST_PROCESSING = {
'purchases': lambda purchases, fields: process_purchases(purchases, fields)
# ... and more of these too
}
# global variables
client = Elasticsearch(ELASTIC_HOST)
epoch = datetime.utcfromtimestamp(0)
def process_purchases(purchases, fields):
if purchases:
fields["cards"] = []
for per_merchant in filter(None, purchases.split('\n')):
count, merchant = per_merchant.strip().split(None, 1)
fields["cards"].append({"merchant": merchant, "value": int(count)})
return fields
def to_metric(data):
for event, pattern in METRIC_PATTERNS.iteritems():
m = pattern.match(data['text'])
if m:
doc = {
'event': event,
'source': 'slack',
'timestamp': datetime.utcfromtimestamp(float(data['ts']))
}
fields = m.groupdict()
for field, handler in POST_PROCESSING.iteritems():
fields = handler(fields.pop(field, None), fields)
return dict(doc.items() + fields.items())
def process_message(data):
# Filter out only the metrics messages we care about
if data['user'] == USER_ID and data['channel'] == CHANNEL:
metric = to_metric(data)
if not metric:
print "ERROR parsing %s" % data
else:
client.index(ELASTIC_INDEX, ELASTIC_TYPE, metric)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment