Skip to content

Instantly share code, notes, and snippets.

@justinfx
Last active January 11, 2019 04:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinfx/b0adb36e694ec03365da19b6bbf33c20 to your computer and use it in GitHub Desktop.
Save justinfx/b0adb36e694ec03365da19b6bbf33c20 to your computer and use it in GitHub Desktop.
Repro of nats-io subscribe queue not evenly load balancing to idle workers

Build

go build worker.go
go build sender.go

Test

Start gnatsd server and 3 worker processes. Then run test to send 3 messages at once. It is expected that all 3 messages are picked up by 3 idle workers, and this may happen on the first run. Re-running the send test results in one of the workers not picking up a message and another worker handles 2 consecutive messages.

shell 1

gnatsd -p 23456

shell 2,3,4

./worker

shell 5

python ./send.py

# output
started subprocess
started subprocess
started subprocess
[None, None, None]
[None, None, None]
[None, None, None]
[None, None, None]
[None, None, None]
2019/01/11 17:36:58 Received: 2019-01-11 17:36:58.929140694 +1300 NZDT m=+222.514945473
2019/01/11 17:36:58 Received: 2019-01-11 17:36:58.929872619 +1300 NZDT m=+221.086229133
[0, 0, None]
[0, 0, None]
[0, 0, None]
[0, 0, None]
[0, 0, None]
2019/01/11 17:37:03 Received: 2019-01-11 17:37:03.930106351 +1300 NZDT m=+226.086462907
[0, 0, 0]
package main
import (
"log"
"time"
"github.com/nats-io/go-nats"
)
func main() {
nc, err := nats.Connect("nats://localhost:23456", nats.Name("runner"))
if err != nil {
log.Fatal(err)
}
defer nc.Close()
msg, err := nc.Request("worker", []byte("sleep"), 20*time.Second)
if err != nil {
if nc.LastError() != nil {
log.Fatalf("%v for request", nc.LastError())
}
log.Fatalf("%v for request", err)
}
log.Printf("Received: %s", msg.Data)
}
"""
Test script for sending 3 messages to 3 workers
"""
import subprocess
import time
def start():
proc = subprocess.Popen(["/tmp/nats_test/sender"])
print "started subprocess"
return proc
procs=[]
for i in range(3):
procs.append(start())
while True:
result_list=[p.poll() for p in procs]
print result_list
running = None in result_list
if not running:
break
time.sleep(1)
package main
import (
"log"
"runtime"
"time"
"github.com/nats-io/go-nats"
)
func main() {
nc, err := nats.Connect("nats://localhost:23456", nats.Name("worker"))
if err != nil {
log.Fatal(err)
}
nc.QueueSubscribe("worker", "queue", func(msg *nats.Msg) {
log.Printf("received message. sleeping")
time.Sleep(5 * time.Second)
log.Printf("done sleeping")
if err := nc.Publish(msg.Reply, []byte(time.Now().String())); err != nil {
log.Fatal(err)
}
})
nc.Flush()
if err := nc.LastError(); err != nil {
log.Fatal(err)
}
runtime.Goexit()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment