Skip to content

Instantly share code, notes, and snippets.

@mertyildiran
Last active September 3, 2021 23:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mertyildiran/07617f2070433da677fb20c96ff29e67 to your computer and use it in GitHub Desktop.
Save mertyildiran/07617f2070433da677fb20c96ff29e67 to your computer and use it in GitHub Desktop.
Kafka Go demo code
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
// to consume messages
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
_, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b))
}
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
}
package main
import (
"net"
"strconv"
"github.com/segmentio/kafka-go"
)
func main() {
// to create topics when auto.create.topics.enable='false'
topic := "my-topic"
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
kafka.TopicConfig{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
}
package main
import (
"fmt"
"github.com/segmentio/kafka-go"
)
func main() {
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err.Error())
}
m := map[string]struct{}{}
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
for k := range m {
fmt.Println(k)
}
}
package main
import (
"context"
"log"
"net"
"strconv"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
// to produce messages
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
kafka.TopicConfig{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
kafka.Message{Key: []byte("key-one"), Value: []byte("one!"), Headers: []kafka.Header{{Key: "hdr1", Value: []byte("val1")}}},
kafka.Message{Key: []byte("key-two"), Value: []byte("two!"), Headers: []kafka.Header{{Key: "hdr2", Value: []byte("val2")}}},
kafka.Message{Key: []byte("key-three"), Value: []byte("three!"), Headers: []kafka.Header{{Key: "hdr3", Value: []byte("val3")}}},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment