Skip to content

Instantly share code, notes, and snippets.

Created January 30, 2024 20:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save techzilla/6d2543c38bfb66ac09b84d0cb5194cd9 to your computer and use it in GitHub Desktop.
Save techzilla/6d2543c38bfb66ac09b84d0cb5194cd9 to your computer and use it in GitHub Desktop.
package main
import (
const (
kafkaBrokers = "localhost:9092"
kafkaTopic = "syslog_topic"
// SyslogMessage represents the structure of the syslog-like JSON message
type SyslogMessage struct {
Message string `json:"message"`
Facility string `json:"facility"`
Severity string `json:"severity"`
Timestamp string `json:"timestamp"`
func main() {
numCPU := runtime.NumCPU()
// Create a wait group to wait for all goroutines to finish
var wg sync.WaitGroup
// Launch a goroutine for each CPU core
for i := 0; i < numCPU; i++ {
go func(cpuID int) {
defer wg.Done()
// Wait for all goroutines to finish
func runProducer(cpuID int) {
// Initialize Kafka writer configuration
config := kafka.WriterConfig{
Brokers: []string{kafkaBrokers},
Topic: kafkaTopic,
Balancer: &kafka.LeastBytes{},
// Create Kafka writer
writer := kafka.NewWriter(config)
// Print the CPU ID for each producer
fmt.Printf("Producer for CPU %d started\n", cpuID)
// Infinite loop sending unending stream of messages to Kafka
for {
// Construct syslog-like JSON message
syslogMessage := SyslogMessage{
Message: fmt.Sprintf("Log message from CPU %d", cpuID),
Facility: "local0",
Severity: "info",
Timestamp: time.Now().Format(time.RFC3339),
// Convert syslogMessage to JSON string
syslogJSON, err := json.Marshal(syslogMessage)
if err != nil {
log.Printf("Error marshaling syslog message: %v\n", err)
// Create Kafka message
message := kafka.Message{
Key: nil,
Value: syslogJSON,
// Send the Kafka message
err = writer.WriteMessages(context.Background(), message)
if err != nil {
log.Printf("Failed to send message to Kafka: %v\n", err)
// Print a message for each sent message (optional)
// fmt.Printf("Message sent from CPU %d\n", cpuID)
// Introduce a small delay to control the rate of messages
time.Sleep(100 * time.Millisecond)
// Note: The producer will keep running indefinitely. In a real-world scenario, you may want to implement a way
// to gracefully shut down the producer when needed (e.g., through signals or external triggers).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment