Skip to content

Instantly share code, notes, and snippets.

@rmoff
Created April 5, 2018 20:05
Show Gist options
  • Save rmoff/e1010e46973042bdf4ca7b3f318df660 to your computer and use it in GitHub Desktop.
Save rmoff/e1010e46973042bdf4ca7b3f318df660 to your computer and use it in GitHub Desktop.
# rmoff / 05 Apr 2018
from slackclient import SlackClient
from confluent_kafka import Consumer, KafkaError
import json
token = 'xxxxxxxxxxxx'
sc = SlackClient(token)
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages
# from the beginning of the topic the first time that this runs.
#
# If you want to monitor your data streams through Confluent Control Center,
# make sure you've installed the interceptors and then uncomment
# the 'plugin.library.paths' config line
# Ref: https://docs.confluent.io/current/control-center/docs/installation/clients.html#installing-control-center-interceptors
settings = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'python_slack',
# 'plugin.library.paths': 'monitoring-interceptor',
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
c = Consumer(settings)
c.subscribe(['KSQL_NOTIFY'])
try:
while True:
msg = c.poll(0.1)
if msg is None:
continue
elif not msg.error():
print('Received message: {0}'.format(msg.value()))
if msg.value() is None:
continue
# Handle UTF
try:
data = msg.value().decode()
except Exception:
data = msg.value()
# Try to parse the message as JSON
try:
app_msg = json.loads(data)
except Exception:
print('Could not parse JSON, will just use raw message contents')
app_msg = data
# Try to extract the channel & message
try:
channel=app_msg['CHANNEL']
text=app_msg['TEXT']
except Exception:
print('Failed to get channel/text from message')
channel='general'
text=msg.value()
# Send the message to Slack
print('\nSending message "{}" to channel {}'.format(channel, text))
sc_response = sc.api_call('chat.postMessage', channel=channel,
text=text, username='KSQL Notifications',
icon_emoji=':rocket:')
if not sc_response['ok']:
print('\t** FAILED: {}'.format(sc_response['error']))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {}/{}'
.format(msg.topic(), msg.partition()))
else:
print('Error occured: {}'.format(msg.error()))
except Exception as e:
print(e)
finally:
c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment