Skip to content

Instantly share code, notes, and snippets.

@zorteran
Created November 3, 2022 20:09
Show Gist options
  • Save zorteran/ac9e47ebd26e9ada21b9f1030bdd1651 to your computer and use it in GitHub Desktop.
Save zorteran/ac9e47ebd26e9ada21b9f1030bdd1651 to your computer and use it in GitHub Desktop.
from json import loads
from confluent_kafka import Consumer
from loguru import logger
from discord_webhook import DiscordWebhook, DiscordEmbed
KAFKA_CONF = {'bootstrap.servers': "10.10.10.10:9092",
'group.id': "dev-wiaderko",
'auto.offset.reset': 'latest'} # earliest / latest
KAFKA_TOPICS = ["^es-alerts.*"]
DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/something/somethingelse"
DISCORD_NOTIFICATION_USERNAME = "Mały Krzykacz"
def send_discord_notification(title, description):
webhook = DiscordWebhook(url=DISCORD_WEBHOOK_URL, username=DISCORD_NOTIFICATION_USERNAME)
embed = DiscordEmbed(title=title, description=description, color='03b2f8')
embed.set_timestamp()
webhook.add_embed(embed)
webhook.execute()
def process_message(msg):
logger.info("Processing message: topic={} partition={} offset={}", msg.error(), msg.topic(), msg.partition(),
msg.offset())
message = loads(msg.value().decode('utf-8'))
send_discord_notification(title=message["kibana.alert.rule.name"],
description=message["kibana.alert.reason"].replace("senuto", 'wiaderko'))
logger.info("If you start me up, I'll never stop!")
try:
consumer = Consumer(KAFKA_CONF)
consumer.subscribe(KAFKA_TOPICS)
while True:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
logger.error("Kafka error: code={} topic={} partition={} offset={}", msg.error(), msg.topic(),
msg.partition(), msg.offset())
process_message(msg)
finally:
consumer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment