Skip to content

Instantly share code, notes, and snippets.

@mertyildiran
Last active December 10, 2021 00:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mertyildiran/7286261b3b4c9345f4f51fe30a955f16 to your computer and use it in GitHub Desktop.
Save mertyildiran/7286261b3b4c9345f4f51fe30a955f16 to your computer and use it in GitHub Desktop.
Mizu Development Workflow

In this tutorial we'll explain two development workflows for Mizu:

  • Standard development workflow that you build a Docker image, publish into Docker Hub and pull it to Kubernetes. The workflow is only necessary if you're developing some Kubernetes specific feature.
  • Local machine development workflow that you run Mizu Agent on your machine with sudo privileges. Which is much faster than the standard development workflow and can be used if you're not developing a Kubernetes specific feature.

Standard Development Workflow

Whenever you make changes in the agent and tap packages or protocol extensions to test it; you first need to build a Docker image:

$ docker build . -t mertyildiran/mizuagent:latest

Then you need to push that image into Docker Hub:

$ docker push mertyildiran/mizuagent:latest

Set the agent-image to mertyildiran/mizuagent:latest in the Mizu config YAML (on path ~/.mizu/config.yaml):

mertyildiran@Corsair:~/.mizu$ cat config.yaml
agent-image: mertyildiran/mizuagent:latest
telemetry: false

It will make Mizu to use your custom Mizu Agent Docker image as the API server and tapper Daemon Set.

You also need to build the CLI program:

$ make cli

Start minikube:

$ minikube start

Deploy hello-node:

$ kubectl create deployment hello-node --image=k8s.gcr.io/echoserver:1.4
$ kubectl expose deployment hello-node --type=LoadBalancer --port=8080

You then look at the pod name:

$ kubectl get pod -A
NAMESPACE              NAME                                         READY   STATUS    RESTARTS   AGE
default                hello-node-7567d9fdc9-f9n7r                  1/1     Running   3          6d14h
kube-system            coredns-558bd4d5db-qkz86                     1/1     Running   15         31d
kube-system            etcd-minikube                                1/1     Running   15         31d
kube-system            kube-apiserver-minikube                      1/1     Running   15         31d
kube-system            kube-controller-manager-minikube             1/1     Running   15         31d
kube-system            kube-proxy-62xdc                             1/1     Running   15         31d
kube-system            kube-scheduler-minikube                      1/1     Running   15         31d
kube-system            storage-provisioner                          1/1     Running   28         31d
kubernetes-dashboard   dashboard-metrics-scraper-7976b667d4-vsmmv   1/1     Running   15         31d
kubernetes-dashboard   kubernetes-dashboard-6fcdf4f6d-9z5xj         1/1     Running   24         31d

Then you tap into the pod with Mizu:

./cli/bin/mizu__ tap hello-node-7567d9fdc9-f9n7r

Then you wait for the pull of the image:

$ kubectl get pod -A
NAMESPACE              NAME                                         READY   STATUS    RESTARTS   AGE
default                hello-node-7567d9fdc9-f9n7r                  1/1     Running   3          6d14h
kube-system            coredns-558bd4d5db-qkz86                     1/1     Running   15         31d
kube-system            etcd-minikube                                1/1     Running   15         31d
kube-system            kube-apiserver-minikube                      1/1     Running   15         31d
kube-system            kube-controller-manager-minikube             1/1     Running   15         31d
kube-system            kube-proxy-62xdc                             1/1     Running   15         31d
kube-system            kube-scheduler-minikube                      1/1     Running   15         31d
kube-system            storage-provisioner                          1/1     Running   28         31d
kubernetes-dashboard   dashboard-metrics-scraper-7976b667d4-vsmmv   1/1     Running   15         31d
kubernetes-dashboard   kubernetes-dashboard-6fcdf4f6d-9z5xj         1/1     Running   24         31d
mizu                   mizu-api-server                              1/1     Running   0          15s
mizu                   mizu-tapper-daemon-set-z7w2m                 1/1     Running   0          15s

You look at the Daemon Set logs to see if the program running as expected:

$ kubectl logs mizu-tapper-daemon-set-z7w2m -n mizu
2021/08/23 10:43:31 Loading extension: amqp.so
2021/08/23 10:43:31 Initializing AMQP extension...
2021/08/23 10:43:31 Extension Properties: &{Protocol:{Name:amqp LongName:Advanced Message Queuing Protocol 0-9-1 Abbreviation:AMQP Version:0-9-1 BackgroundColor:#ff6600 ForegroundColor:#ffffff FontSize:12 ReferenceLink:https://www.rabbitmq.com/amqp-0-9-1-reference.html Ports:[5671 5672]} Path:/app/extensions/amqp.so Plug:0xc0002deb40 Dissector:0x7fd399bd0070}
2021/08/23 10:43:31 Loading extension: http.so
2021/08/23 10:43:31 Initializing HTTP extension.
2021/08/23 10:43:31 Extension Properties: &{Protocol:{Name:http LongName:Hypertext Transfer Protocol -- HTTP/1.1 Abbreviation:HTTP Version:1.1 BackgroundColor:#205cf5 ForegroundColor:#ffffff FontSize:12 ReferenceLink:https://datatracker.ietf.org/doc/html/rfc2616 Ports:[80 8080 50051]} Path:/app/extensions/http.so Plug:0xc0002de9f0 Dissector:0x7fd399851860}
2021/08/23 10:43:31 Loading extension: kafka.so
2021/08/23 10:43:31 Initializing Kafka extension...
2021/08/23 10:43:31 Extension Properties: &{Protocol:{Name:kafka LongName:Apache Kafka Protocol Abbreviation:KAFKA Version:12 BackgroundColor:#000000 ForegroundColor:#ffffff FontSize:11 ReferenceLink:https://kafka.apache.org/protocol Ports:[9092]} Path:/app/extensions/kafka.so Plug:0xc0002df0e0 Dissector:0x7fd399185940}
2021/08/23 10:43:31 All extension ports: [5671 5672 80 8080 50051 9092]
2021-08-23T10:43:31Z INFO     : Filtering for the following authorities: [172.17.0.5]
2021-08-23T10:43:31Z INFO     : Received empty/no APP_PORTS env var! only listening to ports: [5671 5672 80 8080 50051 9092]
2021/08/23 10:43:31 passive_tapper.go:254: App Ports: []
Failed connecting to websocket server: dial tcp 10.98.144.224:80: connect: connection refused, (dial tcp 10.98.144.224:80: connect: connection refused,dial tcp 10.98.144.224:80: connect: connection refused)
2021-08-23T10:43:31Z INFO     : Starting to read packets
2021-08-23T10:43:31Z INFO     : Assembler options: maxBufferedPagesTotal=5000, maxBufferedPagesPerConnection=5000

Notice HTTP, AMQP and Kafka extensions are loaded.

Visit http://localhost:8899/mizu

Run minikube service hello-node to generate some traffic.

Now you should be seeing the traffic in your browser:

You look at the Daemon Set logs again to see if something went wrong:

$ kubectl logs mizu-tapper-daemon-set-z7w2m -n mizu
2021/08/23 10:43:31 Loading extension: amqp.so
2021/08/23 10:43:31 Initializing AMQP extension...
2021/08/23 10:43:31 Extension Properties: &{Protocol:{Name:amqp LongName:Advanced Message Queuing Protocol 0-9-1 Abbreviation:AMQP Version:0-9-1 BackgroundColor:#ff6600 ForegroundColor:#ffffff FontSize:12 ReferenceLink:https://www.rabbitmq.com/amqp-0-9-1-reference.html Ports:[5671 5672]} Path:/app/extensions/amqp.so Plug:0xc0002deb40 Dissector:0x7fd399bd0070}
2021/08/23 10:43:31 Loading extension: http.so
2021/08/23 10:43:31 Initializing HTTP extension.
2021/08/23 10:43:31 Extension Properties: &{Protocol:{Name:http LongName:Hypertext Transfer Protocol -- HTTP/1.1 Abbreviation:HTTP Version:1.1 BackgroundColor:#205cf5 ForegroundColor:#ffffff FontSize:12 ReferenceLink:https://datatracker.ietf.org/doc/html/rfc2616 Ports:[80 8080 50051]} Path:/app/extensions/http.so Plug:0xc0002de9f0 Dissector:0x7fd399851860}
2021/08/23 10:43:31 Loading extension: kafka.so
2021/08/23 10:43:31 Initializing Kafka extension...
2021/08/23 10:43:31 Extension Properties: &{Protocol:{Name:kafka LongName:Apache Kafka Protocol Abbreviation:KAFKA Version:12 BackgroundColor:#000000 ForegroundColor:#ffffff FontSize:11 ReferenceLink:https://kafka.apache.org/protocol Ports:[9092]} Path:/app/extensions/kafka.so Plug:0xc0002df0e0 Dissector:0x7fd399185940}
2021/08/23 10:43:31 All extension ports: [5671 5672 80 8080 50051 9092]
2021-08-23T10:43:31Z INFO     : Filtering for the following authorities: [172.17.0.5]
2021-08-23T10:43:31Z INFO     : Received empty/no APP_PORTS env var! only listening to ports: [5671 5672 80 8080 50051 9092]
2021/08/23 10:43:31 passive_tapper.go:254: App Ports: []
Failed connecting to websocket server: dial tcp 10.98.144.224:80: connect: connection refused, (dial tcp 10.98.144.224:80: connect: connection refused,dial tcp 10.98.144.224:80: connect: connection refused)
2021-08-23T10:43:31Z INFO     : Starting to read packets
2021-08-23T10:43:31Z INFO     : Assembler options: maxBufferedPagesTotal=5000, maxBufferedPagesPerConnection=5000
2021/08/23 10:44:31 passive_tapper.go:362: 1m0.001427106s (errors: 452, errTypes:2) - Errors Summary: map[FSM-rejection:134 OptionChecker-rejection:318]
2021/08/23 10:44:31 passive_tapper.go:372: mem: 25554360, goroutines: 13, unmatched messages:
2021/08/23 10:44:31 passive_tapper.go:380: cleaner - flushed connections: 0, closed connections: 0, deleted messages: 0
2021/08/23 10:44:31 passive_tapper.go:388: app stats - {"processedBytes":5817724,"packetsCount":10216,"tcpPacketsCount":10164,"reassembledTcpPayloadsCount":0,"tlsConnectionsCount":0,"matchedPairs":0}

Everything is OK!

You CTRL+C to close Mizu CLI and wait for it to remove the resources:

$ ./cli/bin/mizu__ tap hello-node-7567d9fdc9-f9n7r
Mizu will store up to 200MB of traffic, old traffic will be cleared once the limit is reached.
Tapping pods in namespaces "default"
+hello-node-7567d9fdc9-f9n7r
Mizu is available at http://localhost:8899/mizu
^C
Removing mizu resources
Update available! 0.0.0 -> 0.11.0 (https://github.com/up9inc/mizu/releases/tag/0.11.0)

Conclusion

It took several minutes to test even the tiniest change. So this method should be used only if you're developing something Kubernetes specific.

It also requires you to develop the Kubernetes deployment configuration for the feature that you want to test. Which means that multiply the development by a factor of two at least.

Local Machine Development Workflow

This method involves doing everything on your local machine without requiring any Kubernetes deployment.

Save this script to a file named dev.sh:

#!/bin/bash

rm entries.db
rm -rf pprof/* && make clean && make agent
sudo setcap cap_net_raw,cap_net_admin=eip ./agent/build/mizuagent
./agent/build/mizuagent --api-server & \
PID1=$! && \
sleep 0.5 && \
GOGC=12800 NODE_NAME=dev TAPPED_ADDRESSES_PER_HOST='{"dev": ["localhost"]}' \
    ./agent/build/mizuagent \
        -i any \
        --tap \
        --api-server-address ws://localhost:8899/wsTapper \
        --nodefrag & \
PID2=$! && \
read -r -d '' _ </dev/tty
kill -9 $PID1 && \
kill -9 $PID2

Make it executable chmod +x dev.sh and run ./dev.sh

It will ask your sudo password for enabling the permission of sniffing the network.

This script builds Mizu Agent and runs;

  • API server
  • Tapper

instances with correct configuration for your local machine. It also lets you stop them by CTRL+C

Run the React app in another terminal:

$ cd ui/ && cp .env.example .env && npm install && npm start

That's all. Now you have Mizu listenning your local machine.

Testing HTTP, gRPC, AMQP, Kafka protocols together

Open another terminal, run the following to have RabbitMQ and Kafka available on your network:

$ docker run -d -it --rm --name rabbitmq --net=host rabbitmq:latest && sleep 10
$ docker run -d -it --rm --name kafka --net=host up9inc/mockintosh:self-contained-kafka && sleep 2

Start a gRPC server example:

$ cd ~/Downloads/ && git clone -b v1.38.0 https://github.com/grpc/grpc --depth 1
$ cd ~/Downloads/grpc/examples/python/route_guide && python route_guide_server.py

Save the .go and .py files that are provided by this Gist into a directory.

In the same directory, save this script to a file named run.sh:

#!/bin/bash

go run produce.go &
go run consume.go &
python3 example.py &
python3 ~/Downloads/grpc/examples/python/route_guide/route_guide_client.py &
go run create_topics.go &
go run list_topics.go &

These Go and Python programs will generate the traffic for the protocols: HTTP, gRPC, AMQP, Kafka

Make it executable chmod +x run.sh and run ./run.sh

Now you should be seeing the traffic in your browser:

Conclusion

Testing a TCP packet sniffer/analyzer on your local machine is much a more robust development workflow than building a Docker image and deploying to a Kubernetes cluster.

The time for the trial and error cycle is reduced from several minutes to a few seconds.

This method cannot be used for the features that are Kubernetes specific.

package main
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
// to consume messages
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
_, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b))
}
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
}
package main
import (
"net"
"strconv"
"github.com/segmentio/kafka-go"
)
func main() {
// to create topics when auto.create.topics.enable='false'
topic := "my-topic"
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
kafka.TopicConfig{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
}
import time
import os
from faker import Faker
from pika import BlockingConnection, ConnectionParameters
from pika.spec import BasicProperties
from pika.exceptions import ChannelClosedByBroker, StreamLostError, AMQPConnectionError
fake = Faker()
EXCHANGE = 'topic_logs'
EXCHANGE_TYPE = 'topic'
# HOST = '192.168.49.2'
# PORT = '31583'
HOST = 'localhost'
PORT = '5672'
connection = BlockingConnection(
ConnectionParameters(host=HOST, port=PORT)
)
channel = connection.channel()
queue = 'test1'
channel.queue_declare(queue=queue)
exchange = '%s_%s' % (EXCHANGE, queue)
channel.exchange_declare(exchange=exchange, exchange_type=EXCHANGE_TYPE)
amqp_properties = {
'content_type': 'text/html',
'content_encoding': 'utf-8'
}
channel.queue_bind(
exchange=exchange,
queue=queue,
routing_key='#'
)
def _decoder(value):
try:
return value.decode()
except (AttributeError, UnicodeDecodeError):
return value
def callback(ch, method, properties: BasicProperties, body: bytes):
print(
None if not method.routing_key else method.routing_key,
_decoder(body),
properties.headers,
{x: getattr(properties, x) for x in properties.__dict__ if x != 'headers'}
)
channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
# for i in range(20):
for i in range(2):
os.system("curl -X GET http://neverssl.com\?%s\=%s\&%s\=%s -v" % (fake.word(), fake.word(), fake.word(), fake.word()))
channel.basic_publish(
exchange=exchange,
routing_key='key%d' % i,
body='value%d' % i,
properties=BasicProperties(
headers={
'hdr%d' % i: 'hdr_val%d' % i
},
**amqp_properties
)
)
os.system("curl -X GET http://neverssl.com\?%s\=%s\&%s\=%s -v" % (fake.word(), fake.word(), fake.word(), fake.word()))
time.sleep(5)
connection.close()
module github.com/up9inc/mizu/kafka
go 1.16
require (
github.com/segmentio/kafka-go v0.4.17
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
)
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY=
github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
package main
import (
"fmt"
"github.com/segmentio/kafka-go"
)
func main() {
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err.Error())
}
m := map[string]struct{}{}
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
for k := range m {
fmt.Println(k)
}
}
package main
import (
"context"
"log"
"net"
"strconv"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
// to produce messages
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
kafka.TopicConfig{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
kafka.Message{Key: []byte("key-one"), Value: []byte("one!"), Headers: []kafka.Header{{Key: "hdr1", Value: []byte("val1")}}},
kafka.Message{Key: []byte("key-two"), Value: []byte("two!"), Headers: []kafka.Header{{Key: "hdr2", Value: []byte("val2")}}},
kafka.Message{Key: []byte("key-three"), Value: []byte("three!"), Headers: []kafka.Header{{Key: "hdr3", Value: []byte("val3")}}},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment