Last active
January 2, 2021 19:21
-
-
Save codyaray/2d50cf798f5d3fddd232d446da6552df to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Listens to incoming Slack messages from cardbucks-bot and stores the metrics in Elasticsearch | |
""" | |
from elasticsearch import Elasticsearch | |
from datetime import datetime | |
import re | |
# rtmbot variables | |
crontable = [] | |
outputs = [] | |
ELASTIC_HOST = '[your-hostname]' | |
ELASTIC_INDEX = '[your-index]' | |
ELASTIC_TYPE = '[your-type]' | |
USER_ID = '[your Slack bot id]' | |
CHANNEL = '[your Slack channel id]' | |
METRIC_PATTERNS = { | |
'user_registration': re.compile('New User Registration') | |
'user_disabled': re.compile('User Disabled(: )?(?P.*)'), | |
'user_enabled': re.compile('User Enabled'), | |
'payment_failed': re.compile('Payment Failed'), | |
'fingerprint_blacklisted': re.compile('Fingerprint Blacklisted'), | |
'ip_addr_blacklisted': re.compile('IP Address Blacklisted'), | |
'purchase_confirmation': re.compile('User purchase\n(?P.*)', re.DOTALL) | |
# and many more types of events captured... | |
} | |
POST_PROCESSING = { | |
'purchases': lambda purchases, fields: process_purchases(purchases, fields) | |
# ... and more of these too | |
} | |
# global variables | |
client = Elasticsearch(ELASTIC_HOST) | |
epoch = datetime.utcfromtimestamp(0) | |
def process_purchases(purchases, fields): | |
if purchases: | |
fields["cards"] = [] | |
for per_merchant in filter(None, purchases.split('\n')): | |
count, merchant = per_merchant.strip().split(None, 1) | |
fields["cards"].append({"merchant": merchant, "value": int(count)}) | |
return fields | |
def to_metric(data): | |
for event, pattern in METRIC_PATTERNS.iteritems(): | |
m = pattern.match(data['text']) | |
if m: | |
doc = { | |
'event': event, | |
'source': 'slack', | |
'timestamp': datetime.utcfromtimestamp(float(data['ts'])) | |
} | |
fields = m.groupdict() | |
for field, handler in POST_PROCESSING.iteritems(): | |
fields = handler(fields.pop(field, None), fields) | |
return dict(doc.items() + fields.items()) | |
def process_message(data): | |
# Filter out only the metrics messages we care about | |
if data['user'] == USER_ID and data['channel'] == CHANNEL: | |
metric = to_metric(data) | |
if not metric: | |
print "ERROR parsing %s" % data | |
else: | |
client.index(ELASTIC_INDEX, ELASTIC_TYPE, metric) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment