Created
July 19, 2015 04:50
-
-
Save soheilhy/50595c9d3bb197e42e69 to your computer and use it in GitHub Desktop.
AckHandler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// AckHandler handles Ack messages. | |
type AckHandler struct{} | |
func (h AckHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error { | |
ack := msg.Data().(Ack) | |
ddict := ctx.Dict(dequed) | |
key := string(ack.Queue) | |
if val, err := ddict.Get(key); err == nil { | |
dtasks := val.(map[uint64]Task) | |
if _, ok := dtasks[ack.ID]; ok { | |
delete(dtasks, ack.ID) | |
return ddict.Put(key, dtasks) | |
} | |
} | |
// The task might have been moved from dequed to active, because of a timeout. | |
// So, we need to search the active dictionary as well. | |
adict := ctx.Dict(active) | |
var atasks TaskRing | |
if val, err := adict.Get(key); err == nil { | |
atasks = val.(TaskRing) | |
} | |
if ok := atasks.Remove(ack.ID); !ok { | |
return ErrNoSuchTask | |
} | |
return adict.Put(key, atasks) | |
} | |
func (h AckHandler) Map(msg beehive.Msg, | |
ctx beehive.MapContext) beehive.MappedCells { | |
// Send the Ack message to the bee that owns the Queue's entry in | |
// the active and the dequed dictionaries. | |
q := string(msg.Data().(Ack).Queue) | |
return beehive.MappedCells{{active, q}, {dequed, q}} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment