Skip to content

Instantly share code, notes, and snippets.

@techzilla
Created January 30, 2024 19:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save techzilla/9dbab0c5e4ca93a588987636422eb1f7 to your computer and use it in GitHub Desktop.
Save techzilla/9dbab0c5e4ca93a588987636422eb1f7 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"time"
"github.com/Shopify/sarama"
)
const (
kafkaBrokers = "localhost:9092"
kafkaTopic = "syslog_topic"
logMessage = "Example log message"
logFacility = "local0"
logSeverity = "info"
logTimestamp = "2006-01-02T15:04:05.999Z07:00"
)
func main() {
// Initialize Kafka producer configuration
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to acknowledge
config.Producer.Compression = sarama.CompressionSnappy // Use Snappy compression
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
// Create Kafka producer
producer, err := sarama.NewSyncProducer([]string{kafkaBrokers}, config)
if err != nil {
log.Fatalf("Error creating Kafka producer: %v", err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatalf("Error closing Kafka producer: %v", err)
}
}()
// Construct syslog-like JSON message
syslogJSON := fmt.Sprintf(`{"message": "%s", "facility": "%s", "severity": "%s", "timestamp": "%s"}`,
logMessage, logFacility, logSeverity, logTimestamp)
// Send the syslog JSON message to Kafka
message := &sarama.ProducerMessage{
Topic: kafkaTopic,
Value: sarama.StringEncoder(syslogJSON),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatalf("Failed to send message to Kafka: %v", err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment