Skip to content

Instantly share code, notes, and snippets.

@soheilhy
Created July 19, 2015 15:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soheilhy/1cad935353c65e8ad25d to your computer and use it in GitHub Desktop.
Save soheilhy/1cad935353c65e8ad25d to your computer and use it in GitHub Desktop.
TimeoutHandler
// TimeoutHandler handles Ack messages.
type TimeoutHandler struct {
// ExpDur is the duration after which a dequed and unacknowledged task
// is returned to the active dictionary.
ExpDur time.Duration
}
func (h TimeoutHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
tout := time.Time(msg.Data().(Timeout))
ddict := ctx.Dict(dequed)
expired := make(map[string][]Task)
ddict.ForEach(func(k string, v interface{}) {
dtasks := v.(map[uint64]dqTask)
for _, t := range dtasks {
if tout.Sub(t.DequeTime) < h.ExpDur {
continue
}
expired[k] = append(expired[k], t.Task)
}
})
adict := ctx.Dict(active)
for q, etasks := range expired {
v, _ := ddict.Get(q)
dtasks := v.(map[uint64]dqTask)
v, _ = adict.Get(q)
atasks := v.(TaskRing)
for _, t := range etasks {
delete(dtasks, t.ID)
atasks.Enque(t)
}
if err := ddict.Put(q, dtasks); err != nil {
return err
}
if err := adict.Put(q, atasks); err != nil {
return err
}
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment