Skip to content

Instantly share code, notes, and snippets.

@ycyr
Created February 16, 2023 11:07
Show Gist options
  • Save ycyr/d0adee7cb09b2c1caa2c022af8c9dee4 to your computer and use it in GitHub Desktop.
Save ycyr/d0adee7cb09b2c1caa2c022af8c9dee4 to your computer and use it in GitHub Desktop.
import base64
import json
import boto3
from kafka import KafkaProducer
msk_topic = 'my-msk-topic'
msk_broker_list = ['my-msk-broker-1:9092', 'my-msk-broker-2:9092'] # Replace with your own broker list
# Create a Kafka producer with the specified broker list
producer = KafkaProducer(bootstrap_servers=msk_broker_list)
def lambda_handler(event, context):
# Get the raw log data from the CloudWatch subscription
log_data = base64.b64decode(event['awslogs']['data'])
logs = json.loads(log_data)
# Extract the log events and publish them to the Kafka topic
for log_event in logs['logEvents']:
message = log_event['message']
producer.send(msk_topic, message.encode('utf-8'))
# Flush any remaining messages in the producer queue
producer.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment