Skip to content

Instantly share code, notes, and snippets.

@soheilhy
Last active August 29, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soheilhy/825c538350367af107ae to your computer and use it in GitHub Desktop.
Save soheilhy/825c538350367af107ae to your computer and use it in GitHub Desktop.
DeQHandler.Rcv
// DeQHandler handles Deque messages.
type DeQHandler struct{}
func (h DeQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
deq := msg.Data().(Deque)
// (a) we deque the task from active tasks.
adict := ctx.Dict(active)
key := string(deq.Queue)
var atasks TaskRing
if val, err := adict.Get(key); err == nil {
atasks = val.(TaskRing)
}
task, ok := atasks.Deque()
if !ok {
return ErrEmptyQueue
}
if err := adict.Put(key, atasks); err != nil {
return err
}
// (b) add the task to the dequed dictionary.
ddict := ctx.Dict(dequed)
var dtasks map[uint64]dqTask
if val, err := ddict.Get(key); err == nil {
dtasks = val.(map[uint64]dqTask)
} else {
dtasks = make(map[uint64]dqTask)
}
dtasks[task.ID] = dqTask{
Task: task,
DequeTime: time.Now(),
}
if err := ddict.Put(key, dtasks); err != nil {
return err
}
// (c) send the task as a reply to the Deque message.
ctx.ReplyTo(msg, task)
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment