Created
April 5, 2018 20:05
-
-
Save rmoff/e1010e46973042bdf4ca7b3f318df660 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# rmoff / 05 Apr 2018 | |
from slackclient import SlackClient | |
from confluent_kafka import Consumer, KafkaError | |
import json | |
token = 'xxxxxxxxxxxx' | |
sc = SlackClient(token) | |
# Set 'auto.offset.reset': 'smallest' if you want to consume all messages | |
# from the beginning of the topic the first time that this runs. | |
# | |
# If you want to monitor your data streams through Confluent Control Center, | |
# make sure you've installed the interceptors and then uncomment | |
# the 'plugin.library.paths' config line | |
# Ref: https://docs.confluent.io/current/control-center/docs/installation/clients.html#installing-control-center-interceptors | |
settings = { | |
'bootstrap.servers': 'localhost:9092', | |
'group.id': 'python_slack', | |
# 'plugin.library.paths': 'monitoring-interceptor', | |
'default.topic.config': {'auto.offset.reset': 'smallest'} | |
} | |
c = Consumer(settings) | |
c.subscribe(['KSQL_NOTIFY']) | |
try: | |
while True: | |
msg = c.poll(0.1) | |
if msg is None: | |
continue | |
elif not msg.error(): | |
print('Received message: {0}'.format(msg.value())) | |
if msg.value() is None: | |
continue | |
# Handle UTF | |
try: | |
data = msg.value().decode() | |
except Exception: | |
data = msg.value() | |
# Try to parse the message as JSON | |
try: | |
app_msg = json.loads(data) | |
except Exception: | |
print('Could not parse JSON, will just use raw message contents') | |
app_msg = data | |
# Try to extract the channel & message | |
try: | |
channel=app_msg['CHANNEL'] | |
text=app_msg['TEXT'] | |
except Exception: | |
print('Failed to get channel/text from message') | |
channel='general' | |
text=msg.value() | |
# Send the message to Slack | |
print('\nSending message "{}" to channel {}'.format(channel, text)) | |
sc_response = sc.api_call('chat.postMessage', channel=channel, | |
text=text, username='KSQL Notifications', | |
icon_emoji=':rocket:') | |
if not sc_response['ok']: | |
print('\t** FAILED: {}'.format(sc_response['error'])) | |
elif msg.error().code() == KafkaError._PARTITION_EOF: | |
print('End of partition reached {}/{}' | |
.format(msg.topic(), msg.partition())) | |
else: | |
print('Error occured: {}'.format(msg.error())) | |
except Exception as e: | |
print(e) | |
finally: | |
c.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment