Skip to content

Instantly share code, notes, and snippets.

@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:23
Basic structures in taskq
// Queue represents a named queue.
type Queue string
// Task represents a task in a queue.
type Task struct {
ID uint64 `json:"id"` // Task's globally unique ID assigned by taskq.
Queue Queue `json:"queue"` // Task's queue.
Body []byte `json:"body"` // Task's client data.
}
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:24
Messages in taskq
// Enqueue enqueus a task.
type Enque struct {
Task // The task to be enqueued.
}
// Deque represents a message emitted to dequeue a task from a queue.
type Deque struct {
Queue // The queue to dequeue a task from.
}
@soheilhy
soheilhy / ring.go
Last active August 29, 2015 14:25
Task ring buffer
// TaskRing is an efficient, gob-compatible ring buffer for Tasks.
// TaskRing always wastes one element.
type TaskRing struct {
...
Stats Stats
}
// Stats represents the statistics of the task ring buffer.
type Stats struct {
Deque uint64 // Deque represents the total nubmer of dequeued tasks.
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
Skeleton of EnQHandler
import (
"github.com/kandoo/beehive"
)
// EnQHandler handles Enque messages.
type EnQHandler struct{}
func (h EnQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {...}
func (h EnQHandler) Map(msg beehive.Msg, ctx beehive.MapContext) beehive.MappedCells {...}
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
An incomplete, inefficient implementation of EnQHanlder.Rcv
const (
// Dictionary names.
active = "active"
dequed = "dequed"
)
func (h EnQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
// TODO: assign a unique ID.
enq := msg.Data().(Enque)
@soheilhy
soheilhy / queue.go
Created July 16, 2015 16:35
EnQHandler.Rcv
func (h EnQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
enq := msg.Data().(Enque)
dict := ctx.Dict(active)
key := string(enq.Task.Queue)
var tasks TaskRing
if val, err := dict.Get(key); err == nil {
tasks = val.(TaskRing)
}
@soheilhy
soheilhy / queue.go
Created July 16, 2015 17:00
EnQHandler.Map
func (h EnQHandler) Map(msg beehive.Msg, ctx beehive.MapContext) beehive.MappedCells {
// Send the Enque message to the bee that owns
// the Queue's entry in the active dictionary.
q := string(msg.Data().(Enque).Queue)
return beehive.MappedCells{{active, q}}
}
@soheilhy
soheilhy / args_example.go
Last active August 29, 2015 14:25
args example
package main
import (
"fmt"
"github.com/soheilhy/args"
)
var ListenOn = args.NewInt(args.Default(8080))
var BufferSize = args.NewUint64(args.Default(uint64(1024 * 1024)))
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
DeQHandler.Rcv
// DeQHandler handles Deque messages.
type DeQHandler struct{}
func (h DeQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
deq := msg.Data().(Deque)
// (a) we deque the task from active tasks.
adict := ctx.Dict(active)
key := string(deq.Queue)
@soheilhy
soheilhy / queue.go
Created July 19, 2015 04:31
DeQHandler.Map
func (h DeQHandler) Map(msg beehive.Msg,
ctx beehive.MapContext) beehive.MappedCells {
// Send the Deque message to the bee that owns the Queue's entry in
// the active and the dequed dictionaries.
q := string(msg.Data().(Deque).Queue)
return beehive.MappedCells{{active, q}, {dequed, q}}
}