Skip to content

Instantly share code, notes, and snippets.

@ycyr
Created February 16, 2023 11:23
Show Gist options
  • Save ycyr/592a953fad25d76568de6a355d395f7f to your computer and use it in GitHub Desktop.
Save ycyr/592a953fad25d76568de6a355d395f7f to your computer and use it in GitHub Desktop.
package main
import (
"encoding/base64"
"encoding/json"
"log"
"strings"
"sync"
"github.com/Shopify/sarama"
)
var mskTopic = "my-msk-topic"
var mskBrokerList = []string{"my-msk-broker-1:9092", "my-msk-broker-2:9092"} // Replace with your own broker list
func main() {
// Initialize a Sarama configuration with the specified broker list
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Compression = sarama.CompressionGZIP
producer, err := sarama.NewAsyncProducer(mskBrokerList, config)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatal(err)
}
}()
// Define the Lambda function handler
handler := func(event json.RawMessage) error {
// Parse the CloudWatch Logs subscription event
var record struct {
Data string `json:"data"`
}
if err := json.Unmarshal(event, &record); err != nil {
return err
}
// Decode the base64-encoded log data
logData, err := base64.StdEncoding.DecodeString(record.Data)
if err != nil {
return err
}
// Extract the log events and publish them to the Kafka topic
var logs struct {
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
LogEvents []struct {
Message string `json:"message"`
Timestamp int64 `json:"timestamp"`
} `json:"logEvents"`
Owner string `json:"owner"`
}
if err := json.Unmarshal(logData, &logs); err != nil {
return err
}
// Use a wait group to wait for all messages to be published
var wg sync.WaitGroup
for _, logEvent := range logs.LogEvents {
wg.Add(1)
go func(message string) {
defer wg.Done()
msg := &sarama.ProducerMessage{
Topic: mskTopic,
Value: sarama.StringEncoder(message),
}
producer.Input() <- msg
}(strings.TrimSpace(logEvent.Message))
}
wg.Wait()
return nil
}
// Start the Lambda function with an event loop
lambda.Start(handler)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment