Skip to content

Instantly share code, notes, and snippets.

@ripienaar
Last active February 26, 2020 19:48
Show Gist options
  • Save ripienaar/353c777e2abbf1ac0032d55345a0b3eb to your computer and use it in GitHub Desktop.
Save ripienaar/353c777e2abbf1ac0032d55345a0b3eb to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"hash/crc32"
"log"
"github.com/nats-io/nats.go"
"github.com/nats-io/jetstream/internal/jsch"
)
type User struct {
UserID string
}
// Partition calculates a numbered partition this user belongs to based on a max of partitions
func (u *User) Partition(partitions uint32) uint32 {
return crc32.ChecksumIEEE([]byte(u.UserID)) % partitions
}
func panicIfErr(err error) {
if err != nil {
panic(err)
}
}
// subscribes to a specific partition using a emphemeral consumer on subject 103.<partition>
func worker(ec *nats.EncodedConn, s *jsch.Stream, partition int) {
subj := fmt.Sprintf("103.%d", partition)
ib := nats.NewInbox()
handled := 0
ec.Subscribe(ib, func(u *User) {
handled++
log.Printf("[worker:%d handled:%d] %v\n", partition, handled, u)
})
_, err := s.NewConsumer(jsch.FilterStreamBySubject(subj), jsch.DeliverySubject(ib), jsch.DeliverAllAvailable())
panicIfErr(err)
<-context.Background().Done()
}
func main() {
panicIfErr(jsch.Connect("localhost:4222"))
ec, err := nats.NewEncodedConn(jsch.Connection(), nats.JSON_ENCODER)
panicIfErr(err)
// stream listening on subjects 103.*
str, err := jsch.LoadOrNewStream("103", jsch.Subjects("103.*"))
panicIfErr(err)
// make sure its empty for demo
str.Purge()
// expect 2 partitions
partitions := uint32(2)
// start 2 workers
for p := 0; p < int(partitions); p++ {
go worker(ec, str, p)
}
// publishes messages to 103.<partition> based on the partition key on the user
for c := 0; c < 5; c++ {
for i := 1; i <= 10; i++ {
user := User{fmt.Sprintf("U%d", i)}
err = ec.Publish(fmt.Sprintf("103.%d", user.Partition(partitions)), user)
panicIfErr(err)
}
}
<-context.Background().Done()
}

this user always on the same worker:

2020/02/26 11:47:12 [worker:1 handled:5] &{U10}
2020/02/26 11:47:12 [worker:1 handled:10] &{U10}
2020/02/26 11:47:12 [worker:1 handled:15] &{U10}
2020/02/26 11:47:12 [worker:1 handled:20] &{U10}
2020/02/26 11:47:12 [worker:1 handled:25] &{U10}

this worker many users:

2020/02/26 11:47:12 [worker:1 handled:1] &{U4}
2020/02/26 11:47:12 [worker:1 handled:2] &{U5}
2020/02/26 11:47:12 [worker:1 handled:3] &{U6}
2020/02/26 11:47:12 [worker:1 handled:4] &{U7}
2020/02/26 11:47:12 [worker:1 handled:5] &{U10}
2020/02/26 11:47:12 [worker:1 handled:6] &{U4}
2020/02/26 11:47:12 [worker:1 handled:7] &{U5}
2020/02/26 11:47:12 [worker:1 handled:8] &{U6}
2020/02/26 11:47:12 [worker:1 handled:9] &{U7}
2020/02/26 11:47:12 [worker:1 handled:10] &{U10}
2020/02/26 11:47:12 [worker:1 handled:11] &{U4}
2020/02/26 11:47:12 [worker:1 handled:12] &{U5}
2020/02/26 11:47:12 [worker:1 handled:13] &{U6}
2020/02/26 11:47:12 [worker:1 handled:14] &{U7}
2020/02/26 11:47:12 [worker:1 handled:15] &{U10}
2020/02/26 11:47:12 [worker:1 handled:16] &{U4}
2020/02/26 11:47:12 [worker:1 handled:17] &{U5}
2020/02/26 11:47:12 [worker:1 handled:18] &{U6}
2020/02/26 11:47:12 [worker:1 handled:19] &{U7}
2020/02/26 11:47:12 [worker:1 handled:20] &{U10}
2020/02/26 11:47:12 [worker:1 handled:21] &{U4}
2020/02/26 11:47:12 [worker:1 handled:22] &{U5}
2020/02/26 11:47:12 [worker:1 handled:23] &{U6}
2020/02/26 11:47:12 [worker:1 handled:24] &{U7}
2020/02/26 11:47:12 [worker:1 handled:25] &{U10}
2020/02/26 11:47:12 [worker:1 handled:1] &{U4}
2020/02/26 11:47:12 [worker:1 handled:2] &{U5}
2020/02/26 11:47:12 [worker:1 handled:3] &{U6}
2020/02/26 11:47:12 [worker:0 handled:1] &{U1}
2020/02/26 11:47:12 [worker:1 handled:4] &{U7}
2020/02/26 11:47:12 [worker:1 handled:5] &{U10}
2020/02/26 11:47:12 [worker:0 handled:2] &{U2}
2020/02/26 11:47:12 [worker:0 handled:3] &{U3}
2020/02/26 11:47:12 [worker:1 handled:6] &{U4}
2020/02/26 11:47:12 [worker:1 handled:7] &{U5}
2020/02/26 11:47:12 [worker:1 handled:8] &{U6}
2020/02/26 11:47:12 [worker:1 handled:9] &{U7}
2020/02/26 11:47:12 [worker:1 handled:10] &{U10}
2020/02/26 11:47:12 [worker:0 handled:4] &{U8}
2020/02/26 11:47:12 [worker:0 handled:5] &{U9}
2020/02/26 11:47:12 [worker:1 handled:11] &{U4}
2020/02/26 11:47:12 [worker:1 handled:12] &{U5}
2020/02/26 11:47:12 [worker:0 handled:6] &{U1}
2020/02/26 11:47:12 [worker:0 handled:7] &{U2}
2020/02/26 11:47:12 [worker:1 handled:13] &{U6}
2020/02/26 11:47:12 [worker:0 handled:8] &{U3}
2020/02/26 11:47:12 [worker:0 handled:9] &{U8}
2020/02/26 11:47:12 [worker:1 handled:14] &{U7}
2020/02/26 11:47:12 [worker:0 handled:10] &{U9}
2020/02/26 11:47:12 [worker:1 handled:15] &{U10}
2020/02/26 11:47:12 [worker:0 handled:11] &{U1}
2020/02/26 11:47:12 [worker:1 handled:16] &{U4}
2020/02/26 11:47:12 [worker:0 handled:12] &{U2}
2020/02/26 11:47:12 [worker:1 handled:17] &{U5}
2020/02/26 11:47:12 [worker:0 handled:13] &{U3}
2020/02/26 11:47:12 [worker:1 handled:18] &{U6}
2020/02/26 11:47:12 [worker:0 handled:14] &{U8}
2020/02/26 11:47:12 [worker:1 handled:19] &{U7}
2020/02/26 11:47:12 [worker:0 handled:15] &{U9}
2020/02/26 11:47:12 [worker:1 handled:20] &{U10}
2020/02/26 11:47:12 [worker:0 handled:16] &{U1}
2020/02/26 11:47:12 [worker:1 handled:21] &{U4}
2020/02/26 11:47:12 [worker:0 handled:17] &{U2}
2020/02/26 11:47:12 [worker:1 handled:22] &{U5}
2020/02/26 11:47:12 [worker:0 handled:18] &{U3}
2020/02/26 11:47:12 [worker:1 handled:23] &{U6}
2020/02/26 11:47:12 [worker:0 handled:19] &{U8}
2020/02/26 11:47:12 [worker:1 handled:24] &{U7}
2020/02/26 11:47:12 [worker:0 handled:20] &{U9}
2020/02/26 11:47:12 [worker:1 handled:25] &{U10}
2020/02/26 11:47:12 [worker:0 handled:21] &{U1}
2020/02/26 11:47:12 [worker:0 handled:22] &{U2}
2020/02/26 11:47:12 [worker:0 handled:23] &{U3}
2020/02/26 11:47:12 [worker:0 handled:24] &{U8}
2020/02/26 11:47:12 [worker:0 handled:25] &{U9}
signal: interrupt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment