Skip to content

Instantly share code, notes, and snippets.

@bruth
Last active November 1, 2017 18:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bruth/6c050b1b2c327ef1da8c788222847f5a to your computer and use it in GitHub Desktop.
Save bruth/6c050b1b2c327ef1da8c788222847f5a to your computer and use it in GitHub Desktop.
package main
import (
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/golang/protobuf/proto"
"github.com/nats-io/nats"
)
func main() {
var (
addr string
topic string
shards int
pubs int
instances int64
)
flag.StringVar(&addr, "addr", "nats://localhost:4222", "NATS address.")
flag.StringVar(&topic, "topic", "test.shard", "Sharded topic.")
flag.IntVar(&shards, "shards", 1, "Number of shards.")
flag.IntVar(&pubs, "pubs", 1, "Number of publishers.")
flag.Int64Var(&instances, "instances", 64, "Aggregate instances.")
flag.Parse()
conn, err := nats.Connect(addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
sub, err := ShardedSubscribe(conn, topic, func(msg *nats.Msg) string {
var m Request
proto.Unmarshal(msg.Data, &m)
return fmt.Sprintf("%s.%d", msg.Subject, m.Instance%int64(shards))
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe()
for i := 0; i < shards; i++ {
go func(i int) {
state := make(map[int64]int32)
conn.Subscribe(fmt.Sprintf("%s.%d", topic, i), func(msg *nats.Msg) {
// Random latency.
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
var m Request
proto.Unmarshal(msg.Data, &m)
rep := &Reply{
Instance: m.Instance,
}
v := state[m.Instance]
if m.Version == state[m.Instance] {
v++
state[m.Instance] = v
/*
if m.Retries > 0 {
log.Printf("%d:%d, %d -> %d [fixed]", i, m.Instance, m.Version, v)
}
*/
} else {
rep.Conflict = true
//log.Printf("%d:%d, %d != %d [retry %d]", i, m.Instance, m.Version, v, m.Retries)
}
rep.Version = v
b, _ := proto.Marshal(rep)
conn.Publish(msg.Reply, b)
})
}(i)
}
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
wg := &sync.WaitGroup{}
wg.Add(pubs)
Sum := func(a ...int32) int32 {
var s int32
for _, x := range a {
s = s + x
}
return s
}
t0 := time.Now()
var t1 time.Time
reqs := make([]int32, pubs)
// Current versions
state := make([]int32, int(instances))
var retries int32
for i := 0; i < pubs; i++ {
go func(i int) {
defer wg.Done()
var r Reply
var req *Request
for {
select {
case <-sig:
return
default:
//time.Sleep(100 * time.Millisecond)
if req == nil {
inst := rand.Int63n(instances)
req = &Request{
Instance: inst,
Version: state[inst],
}
} else {
req.Retries++
req.Version = state[req.Instance]
}
b, _ := proto.Marshal(req)
reply, err := conn.Request(topic, b, 2*time.Second)
if err != nil {
log.Fatalf("publisher: %s", err)
}
proto.Unmarshal(reply.Data, &r)
state[r.Instance] = r.Version
if r.Conflict {
retries = atomic.AddInt32(&retries, 1)
} else {
reqs[i]++
req = nil
}
}
}
}(i)
}
select {
case <-sig:
case <-time.After(20 * time.Second):
}
close(sig)
t1 = time.Now()
wg.Wait()
d := t1.Sub(t0)
log.Print(d)
t := Sum(reqs...)
log.Print(t)
log.Printf("%f m/s", float64(t)/float64(d/time.Second))
log.Printf("retries: %d", retries)
log.Printf("%f r/s", float64(retries)/float64(d/time.Second))
}
type Sharder func(msg *nats.Msg) string
func ShardedSubscribe(conn *nats.Conn, subject string, sharder Sharder) (*nats.Subscription, error) {
var (
err error
sub *nats.Subscription
)
sub, err = conn.Subscribe(subject, func(msg *nats.Msg) {
next := sharder(msg)
if next == subject {
log.Print("sharded subject cannot be the parent subject")
log.Print("closing sharded subscription")
if err := sub.Unsubscribe(); err != nil {
log.Print(err)
}
return
}
msg.Subject = next
if err := conn.PublishMsg(msg); err != nil {
log.Print("sharder:", err)
}
})
return sub, err
}
// Code generated by protoc-gen-go.
// source: msg.proto
// DO NOT EDIT!
/*
Package main is a generated protocol buffer package.
It is generated from these files:
msg.proto
It has these top-level messages:
Reply
Request
*/
package main
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Reply struct {
Instance int64 `protobuf:"varint,1,opt,name=instance" json:"instance,omitempty"`
Version int32 `protobuf:"varint,2,opt,name=version" json:"version,omitempty"`
Conflict bool `protobuf:"varint,3,opt,name=conflict" json:"conflict,omitempty"`
}
func (m *Reply) Reset() { *m = Reply{} }
func (m *Reply) String() string { return proto.CompactTextString(m) }
func (*Reply) ProtoMessage() {}
func (*Reply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Reply) GetInstance() int64 {
if m != nil {
return m.Instance
}
return 0
}
func (m *Reply) GetVersion() int32 {
if m != nil {
return m.Version
}
return 0
}
func (m *Reply) GetConflict() bool {
if m != nil {
return m.Conflict
}
return false
}
type Request struct {
Instance int64 `protobuf:"varint,1,opt,name=instance" json:"instance,omitempty"`
Version int32 `protobuf:"varint,2,opt,name=version" json:"version,omitempty"`
Retries int32 `protobuf:"varint,3,opt,name=retries" json:"retries,omitempty"`
}
func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *Request) GetInstance() int64 {
if m != nil {
return m.Instance
}
return 0
}
func (m *Request) GetVersion() int32 {
if m != nil {
return m.Version
}
return 0
}
func (m *Request) GetRetries() int32 {
if m != nil {
return m.Retries
}
return 0
}
func init() {
proto.RegisterType((*Reply)(nil), "main.Reply")
proto.RegisterType((*Request)(nil), "main.Request")
}
func init() { proto.RegisterFile("msg.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 144 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcc, 0x2d, 0x4e, 0xd7,
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xc9, 0x4d, 0xcc, 0xcc, 0x53, 0x8a, 0xe4, 0x62, 0x0d,
0x4a, 0x2d, 0xc8, 0xa9, 0x14, 0x92, 0xe2, 0xe2, 0xc8, 0xcc, 0x2b, 0x2e, 0x49, 0xcc, 0x4b, 0x4e,
0x95, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x0e, 0x82, 0xf3, 0x85, 0x24, 0xb8, 0xd8, 0xcb, 0x52, 0x8b,
0x8a, 0x33, 0xf3, 0xf3, 0x24, 0x98, 0x14, 0x18, 0x35, 0x58, 0x83, 0x60, 0x5c, 0x90, 0xae, 0xe4,
0xfc, 0xbc, 0xb4, 0x9c, 0xcc, 0xe4, 0x12, 0x09, 0x66, 0x05, 0x46, 0x0d, 0x8e, 0x20, 0x38, 0x5f,
0x29, 0x92, 0x8b, 0x3d, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x84, 0x4c, 0xc3, 0x25, 0xb8, 0xd8,
0x8b, 0x52, 0x4b, 0x8a, 0x32, 0x53, 0x8b, 0xc1, 0x66, 0xb3, 0x06, 0xc1, 0xb8, 0x49, 0x6c, 0x60,
0x2f, 0x18, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xab, 0xbc, 0xa8, 0xb6, 0xcf, 0x00, 0x00, 0x00,
}
syntax = "proto3";
package main;
message Reply {
int64 instance = 1;
int32 version = 2;
bool conflict = 3;
}
message Request {
int64 instance = 1;
int32 version = 2;
int32 retries = 3;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment