Skip to content

Instantly share code, notes, and snippets.

@grepory
Created August 3, 2018 18:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save grepory/b69748abc38ebb7e5433dd794662d99d to your computer and use it in GitHub Desktop.
Save grepory/b69748abc38ebb7e5433dd794662d99d to your computer and use it in GitHub Desktop.
Queues

Ad Hoc Check Requests via Fanout Queues

Data Model

Messages

A Message on a queue is a combination of a Context and a CheckRequest.

type Context struct {
  Deadline time.Time
}

type Message struct {
  Context Context
  CheckRequest CheckRequest
}

Storage

Queues require some additional bookkeeping to prevent stale subscriber queues from lying around indefinitely.

/queues
  |
  |--> /queues/:queue_id
  |
  |--> /members/:cluster_member_id
  |
  |--> /q
  |     |
  |     |--> /:queue_id
  | .   | .    |
  | .   | .    |--> /:cluster_member_id/:message_id

The queues and members directories aren't strictly necessary, and may be considered premature optimization, but they simplify maintenance tasks considerably.

Operations

  • Subscribe
  • Publish

Subscribe

Subscribe(Queue Name) is a GetOrCreate operation on a queue. As it is the least used operation, it's also the easiest place to do storage maintenance.

func Subscribe(ctx context.Context, subscriberId, queueName string) (<-Message, error)
  // This is safe to do and doesn't race between Subscribers. Delete operations return
  // the number of keys deleted--and do not return an error.
  go func(){
    // xor() obviously isn't a function, but the result of XOR'ing these two sets is the
    // set of non-existent subscribers.
    toClean := xor(client.GetClusterMembers(), client.Get("/queues/subscribers", clientv3.WithPrefix())
    
    for subscriber := range toClean {
      for queue := range client.Get("/queues/queues", clientv3.WithPrefix()) {
        client.Delete("/queues/q/:queue/:subscriber", clientv3.WithPrefix())
      }
      
      // Delete the subscriber last so that if we crash, we don't accidentally
      // leave queues lying around.
      client.Delete("/queues/subscribers/:subscriber")
    }
  }()
  
  return client.Watch(ctx, "/queues/q/queueName/subscriberId", clientv3.WithPrefix())

Publish

Publish sends a message to all current subscribers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment