Skip to content

Instantly share code, notes, and snippets.

@ycyr
Created February 16, 2023 11:14
Show Gist options
  • Save ycyr/c4935e6ef1292fcb4e578c48a3c0b0a4 to your computer and use it in GitHub Desktop.
Save ycyr/c4935e6ef1292fcb4e578c48a3c0b0a4 to your computer and use it in GitHub Desktop.
import base64
import json
from confluent_kafka import Producer
msk_topic = 'my-msk-topic'
msk_broker_list = 'my-msk-broker-1:9092,my-msk-broker-2:9092' # Replace with your own broker list
# Create a Kafka producer with the specified broker list
producer_conf = {
'bootstrap.servers': msk_broker_list,
'client.id': 'my-msk-producer',
'acks': 'all',
'delivery.timeout.ms': 10000,
'compression.type': 'gzip'
}
producer = Producer(producer_conf)
# Define a callback function to handle delivery reports for messages
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to topic {msg.topic()} partition {msg.partition()}')
def lambda_handler(event, context):
# Get the raw log data from the CloudWatch subscription
log_data = base64.b64decode(event['awslogs']['data'])
logs = json.loads(log_data)
# Extract the log events and publish them to the Kafka topic
for log_event in logs['logEvents']:
message = log_event['message']
producer.produce(msk_topic, key=None, value=message, callback=delivery_report)
producer.poll(0) # Required to trigger delivery reports
# Flush any remaining messages in the producer queue
producer.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment