A Message on a queue is a combination of a Context and a CheckRequest.
type Context struct {
Deadline time.Time
}
type Message struct {
Context Context
CheckRequest CheckRequest
}
Queues require some additional bookkeeping to prevent stale subscriber queues from lying around indefinitely.
/queues
|
|--> /queues/:queue_id
|
|--> /members/:cluster_member_id
|
|--> /q
| |
| |--> /:queue_id
| . | . |
| . | . |--> /:cluster_member_id/:message_id
The queues
and members
directories aren't strictly necessary, and may be considered premature optimization, but they simplify maintenance tasks considerably.
- Subscribe
- Publish
Subscribe(Queue Name)
is a GetOrCreate operation on a queue. As it is the least used operation, it's also the easiest place to do storage maintenance.
func Subscribe(ctx context.Context, subscriberId, queueName string) (<-Message, error)
// This is safe to do and doesn't race between Subscribers. Delete operations return
// the number of keys deleted--and do not return an error.
go func(){
// xor() obviously isn't a function, but the result of XOR'ing these two sets is the
// set of non-existent subscribers.
toClean := xor(client.GetClusterMembers(), client.Get("/queues/subscribers", clientv3.WithPrefix())
for subscriber := range toClean {
for queue := range client.Get("/queues/queues", clientv3.WithPrefix()) {
client.Delete("/queues/q/:queue/:subscriber", clientv3.WithPrefix())
}
// Delete the subscriber last so that if we crash, we don't accidentally
// leave queues lying around.
client.Delete("/queues/subscribers/:subscriber")
}
}()
return client.Watch(ctx, "/queues/q/queueName/subscriberId", clientv3.WithPrefix())
Publish sends a message to all current subscribers.