Skip to content

Instantly share code, notes, and snippets.

@kovetskiy
Created July 18, 2018 12:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kovetskiy/3948faf8c9d83e10a91fa06056674e3c to your computer and use it in GitHub Desktop.
Save kovetskiy/3948faf8c9d83e10a91fa06056674e3c to your computer and use it in GitHub Desktop.
sudo systemctl stop kafka; sudo rm -rf /var/lib/kafka/my-topic-0; sudo systemctl start kafka; AMOUNT=20000 THREADS=5 CYCLES=10 go run main.go
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused
2018/07/18 15:02:47 dial tcp [::1]:9092: connect: connection refused
2018/07/18 15:02:48 dial tcp [::1]:9092: connect: connection refused
starting producing messages, messages: 20000 ; threads: 5
PRODUCE (cycle: 1): 20000 messages: 6.248497373s (3200.77 m/s)
PRODUCE (cycle: 2): 20000 messages: 2.599712804s (7693.16 m/s)
PRODUCE (cycle: 3): 20000 messages: 1.655028401s (12084.38 m/s)
PRODUCE (cycle: 4): 20000 messages: 1.65300486s (12099.18 m/s)
PRODUCE (cycle: 5): 20000 messages: 1.667484248s (11994.12 m/s)
PRODUCE (cycle: 6): 20000 messages: 1.687667123s (11850.68 m/s)
PRODUCE (cycle: 7): 20000 messages: 1.733858764s (11534.96 m/s)
PRODUCE (cycle: 8): 20000 messages: 1.705781828s (11724.83 m/s)
PRODUCE (cycle: 9): 20000 messages: 1.691839654s (11821.45 m/s)
PRODUCE (cycle: 10): 20000 messages: 1.7095852s (11698.74 m/s)
package main
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"log"
"os"
"strconv"
"sync"
"time"
"github.com/kovetskiy/goa/uuid"
"github.com/segmentio/kafka-go"
)
type Packet struct {
ID int64 `json:"id"`
AccountDebit uuid.UUID `json:"account_debit" binding:"required"`
AccountCredit uuid.UUID `json:"account_credit" binding:"required"`
Status int64 `json:"status,omitempty"`
Side int64 `json:"side"`
Kind int64 `json:"kind"`
Market [10]byte `json:"market"`
Amount int64 `json:"amount"`
Price int64 `json:"price"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at,omitempty"`
}
func main() {
topic := "my-topic"
partition := 0
var err error
var conn *kafka.Conn
for {
conn, err = kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Println(err)
time.Sleep(time.Millisecond * 100)
continue
}
break
}
buffer := bytes.NewBuffer(nil)
err = binary.Write(buffer, binary.BigEndian, &Packet{})
if err != nil {
panic(err)
}
amount := mustGetEnvInt("AMOUNT")
threads := mustGetEnvInt("THREADS")
cycles := mustGetEnvInt("CYCLES")
fmt.Println(
"starting producing messages, messages:",
amount,
"; threads:",
threads,
)
for retry := 0; retry < cycles; retry++ {
produceStarted := time.Now()
wg := &sync.WaitGroup{}
for t := 0; t < threads; t++ {
wg.Add(1)
go func() {
for i := 0; i < amount/threads; i++ {
_, err = conn.WriteMessages(
kafka.Message{Value: buffer.Bytes()},
)
if err != nil {
panic(err)
}
}
wg.Done()
}()
}
wg.Wait()
produceFinished := time.Now()
duration := produceFinished.Sub(produceStarted)
fmt.Printf(
"PRODUCE (cycle: %d): %d messages: %s (%.2f m/s)\n",
retry+1, amount, duration,
float64(amount)/float64(duration.Seconds()),
)
}
conn.Close()
}
func mustGetEnvInt(key string) int {
value := os.Getenv(key)
if value == "" {
panic("no env value " + key + " specified")
}
number, err := strconv.Atoi(value)
if err != nil {
panic(err)
}
return number
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment