Skip to content

Instantly share code, notes, and snippets.

@soheilhy
soheilhy / queue.go
Created July 19, 2015 16:25
TimeoutHandler.Map
func (h TimeoutHandler) Map(msg beehive.Msg,
ctx beehive.MapContext) beehive.MappedCells {
// Broadcast the timeout message to all local bees.
return beehive.MappedCells{}
}
@soheilhy
soheilhy / args_example_custom_type.go
Last active August 29, 2015 14:25
custom typed args
var port = args.NewInt(args.Flag("example.typed.port", 1234, "the port"))
var roundTripper = args.New(args.Default(http.DefaultTransport))
var timeout = args.NewDuration()
type ServerOpt args.V
func Port(p int) ServerOpt { return ServerOpt(port(p)) }
func RoundTripper(r http.RoundTrippper) ServerOpt { return ServerOpt(roundTripper(r)) }
func Timeout(d time.Duration) ServerOpt { return ServerOpt(timeout(d)) }
@soheilhy
soheilhy / queue.go
Created July 19, 2015 15:58
TimeoutHandler
// TimeoutHandler handles Ack messages.
type TimeoutHandler struct {
// ExpDur is the duration after which a dequed and unacknowledged task
// is returned to the active dictionary.
ExpDur time.Duration
}
func (h TimeoutHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
tout := time.Time(msg.Data().(Timeout))
ddict := ctx.Dict(dequed)
@soheilhy
soheilhy / queue.go
Created July 19, 2015 15:54
Timeout
// Timeout represents a timeout message.
type Timeout time.Time
@soheilhy
soheilhy / queue.go
Created July 19, 2015 04:50
AckHandler
// AckHandler handles Ack messages.
type AckHandler struct{}
func (h AckHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
ack := msg.Data().(Ack)
ddict := ctx.Dict(dequed)
key := string(ack.Queue)
if val, err := ddict.Get(key); err == nil {
@soheilhy
soheilhy / queue.go
Created July 19, 2015 04:31
DeQHandler.Map
func (h DeQHandler) Map(msg beehive.Msg,
ctx beehive.MapContext) beehive.MappedCells {
// Send the Deque message to the bee that owns the Queue's entry in
// the active and the dequed dictionaries.
q := string(msg.Data().(Deque).Queue)
return beehive.MappedCells{{active, q}, {dequed, q}}
}
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
DeQHandler.Rcv
// DeQHandler handles Deque messages.
type DeQHandler struct{}
func (h DeQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
deq := msg.Data().(Deque)
// (a) we deque the task from active tasks.
adict := ctx.Dict(active)
key := string(deq.Queue)
@soheilhy
soheilhy / args_example.go
Last active August 29, 2015 14:25
args example
package main
import (
"fmt"
"github.com/soheilhy/args"
)
var ListenOn = args.NewInt(args.Default(8080))
var BufferSize = args.NewUint64(args.Default(uint64(1024 * 1024)))
@soheilhy
soheilhy / queue.go
Created July 16, 2015 17:00
EnQHandler.Map
func (h EnQHandler) Map(msg beehive.Msg, ctx beehive.MapContext) beehive.MappedCells {
// Send the Enque message to the bee that owns
// the Queue's entry in the active dictionary.
q := string(msg.Data().(Enque).Queue)
return beehive.MappedCells{{active, q}}
}
@soheilhy
soheilhy / queue.go
Created July 16, 2015 16:35
EnQHandler.Rcv
func (h EnQHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
enq := msg.Data().(Enque)
dict := ctx.Dict(active)
key := string(enq.Task.Queue)
var tasks TaskRing
if val, err := dict.Get(key); err == nil {
tasks = val.(TaskRing)
}