Skip to content

Instantly share code, notes, and snippets.

@soheilhy
soheilhy / ring.go
Last active August 29, 2015 14:25
Task ring buffer
// TaskRing is an efficient, gob-compatible ring buffer for Tasks.
// TaskRing always wastes one element.
type TaskRing struct {
...
Stats Stats
}
// Stats represents the statistics of the task ring buffer.
type Stats struct {
Deque uint64 // Deque represents the total nubmer of dequeued tasks.
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
Skeleton of EnQHandler
import (
"github.com/kandoo/beehive"
)
// EnQHandler handles Enque messages.
type EnQHandler struct{}
func (h EnQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {...}
func (h EnQHandler) Map(msg beehive.Msg, ctx beehive.MapContext) beehive.MappedCells {...}
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
An incomplete, inefficient implementation of EnQHanlder.Rcv
const (
// Dictionary names.
active = "active"
dequed = "dequed"
)
func (h EnQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
// TODO: assign a unique ID.
enq := msg.Data().(Enque)
@soheilhy
soheilhy / queue.go
Created July 16, 2015 16:35
EnQHandler.Rcv
func (h EnQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
enq := msg.Data().(Enque)
dict := ctx.Dict(active)
key := string(enq.Task.Queue)
var tasks TaskRing
if val, err := dict.Get(key); err == nil {
tasks = val.(TaskRing)
}
@soheilhy
soheilhy / queue.go
Created July 16, 2015 17:00
EnQHandler.Map
func (h EnQHandler) Map(msg beehive.Msg, ctx beehive.MapContext) beehive.MappedCells {
// Send the Enque message to the bee that owns
// the Queue's entry in the active dictionary.
q := string(msg.Data().(Enque).Queue)
return beehive.MappedCells{{active, q}}
}
@soheilhy
soheilhy / args_example.go
Last active August 29, 2015 14:25
args example
package main
import (
"fmt"
"github.com/soheilhy/args"
)
var ListenOn = args.NewInt(args.Default(8080))
var BufferSize = args.NewUint64(args.Default(uint64(1024 * 1024)))
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
DeQHandler.Rcv
// DeQHandler handles Deque messages.
type DeQHandler struct{}
func (h DeQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
deq := msg.Data().(Deque)
// (a) we deque the task from active tasks.
adict := ctx.Dict(active)
key := string(deq.Queue)
@soheilhy
soheilhy / queue.go
Created July 19, 2015 04:31
DeQHandler.Map
func (h DeQHandler) Map(msg beehive.Msg,
ctx beehive.MapContext) beehive.MappedCells {
// Send the Deque message to the bee that owns the Queue's entry in
// the active and the dequed dictionaries.
q := string(msg.Data().(Deque).Queue)
return beehive.MappedCells{{active, q}, {dequed, q}}
}
@soheilhy
soheilhy / queue.go
Created July 19, 2015 04:50
AckHandler
// AckHandler handles Ack messages.
type AckHandler struct{}
func (h AckHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
ack := msg.Data().(Ack)
ddict := ctx.Dict(dequed)
key := string(ack.Queue)
if val, err := ddict.Get(key); err == nil {
@soheilhy
soheilhy / queue.go
Created July 19, 2015 15:54
Timeout
// Timeout represents a timeout message.
type Timeout time.Time