Skip to content

Instantly share code, notes, and snippets.

@soheilhy
Created July 19, 2015 04:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soheilhy/50595c9d3bb197e42e69 to your computer and use it in GitHub Desktop.
Save soheilhy/50595c9d3bb197e42e69 to your computer and use it in GitHub Desktop.
AckHandler
// AckHandler handles Ack messages.
type AckHandler struct{}
func (h AckHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
ack := msg.Data().(Ack)
ddict := ctx.Dict(dequed)
key := string(ack.Queue)
if val, err := ddict.Get(key); err == nil {
dtasks := val.(map[uint64]Task)
if _, ok := dtasks[ack.ID]; ok {
delete(dtasks, ack.ID)
return ddict.Put(key, dtasks)
}
}
// The task might have been moved from dequed to active, because of a timeout.
// So, we need to search the active dictionary as well.
adict := ctx.Dict(active)
var atasks TaskRing
if val, err := adict.Get(key); err == nil {
atasks = val.(TaskRing)
}
if ok := atasks.Remove(ack.ID); !ok {
return ErrNoSuchTask
}
return adict.Put(key, atasks)
}
func (h AckHandler) Map(msg beehive.Msg,
ctx beehive.MapContext) beehive.MappedCells {
// Send the Ack message to the bee that owns the Queue's entry in
// the active and the dequed dictionaries.
q := string(msg.Data().(Ack).Queue)
return beehive.MappedCells{{active, q}, {dequed, q}}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment