Skip to content

Instantly share code, notes, and snippets.

@soheilhy
soheilhy / queue.go
Created July 19, 2015 04:50
AckHandler
// AckHandler handles Ack messages.
type AckHandler struct{}
func (h AckHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
ack := msg.Data().(Ack)
ddict := ctx.Dict(dequed)
key := string(ack.Queue)
if val, err := ddict.Get(key); err == nil {
@soheilhy
soheilhy / queue.go
Created July 19, 2015 15:54
Timeout
// Timeout represents a timeout message.
type Timeout time.Time
@soheilhy
soheilhy / queue.go
Created July 19, 2015 15:58
TimeoutHandler
// TimeoutHandler handles Ack messages.
type TimeoutHandler struct {
// ExpDur is the duration after which a dequed and unacknowledged task
// is returned to the active dictionary.
ExpDur time.Duration
}
func (h TimeoutHandler) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error {
tout := time.Time(msg.Data().(Timeout))
ddict := ctx.Dict(dequed)
@soheilhy
soheilhy / args_example_custom_type.go
Last active August 29, 2015 14:25
custom typed args
var port = args.NewInt(args.Flag("example.typed.port", 1234, "the port"))
var roundTripper = args.New(args.Default(http.DefaultTransport))
var timeout = args.NewDuration()
type ServerOpt args.V
func Port(p int) ServerOpt { return ServerOpt(port(p)) }
func RoundTripper(r http.RoundTrippper) ServerOpt { return ServerOpt(roundTripper(r)) }
func Timeout(d time.Duration) ServerOpt { return ServerOpt(timeout(d)) }
@soheilhy
soheilhy / queue.go
Created July 19, 2015 16:25
TimeoutHandler.Map
func (h TimeoutHandler) Map(msg beehive.Msg,
ctx beehive.MapContext) beehive.MappedCells {
// Broadcast the timeout message to all local bees.
return beehive.MappedCells{}
}
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
Register taskq handlers (simple)
app := beehive.NewApp("taskq")
app.Handle(Enque{}, EnQHandler{})
app.Handle(Deque{}, DeQHandler{})
app.Handle(Ack{}, AckHandler{})
app.Handle(Timeout{}, TimeoutHandler{
ExpDur: 60 * time.Second,
})
// Detached runs a local go-routine, and NewTimer
// creates a "detached" handler that calls the function
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
Register taskq handler (full)
// RegisterTaskQ registers the taskq application and all its handler in the
// hive.
func RegisterTaskQ(h beehive.Hive) {
a := h.NewApp("taskq", beehive.Persistent(3))
a.Handle(Enque{}, EnQHandler{})
a.Handle(Deque{}, DeQHandler{})
a.Handle(Ack{}, AckHandler{})
a.Handle(Timeout{}, TimeoutHandler{
ExpDur: 60 * time.Second,
})
@soheilhy
soheilhy / main.go
Created July 20, 2015 03:26
main incomplete
func main() {
h := beehive.NewHive()
taskq.RegisterTaskQ(h)
glog.Info("taskq started")
h.Start()
}
@soheilhy
soheilhy / main.go
Last active August 29, 2015 14:25
Incomplete TaskQ/main()
func main() {
h := beehive.NewHive()
taskq.RegisterTaskQ(h)
go h.Start()
defer h.Stop()
q := taskq.Queue("MyQueue")
b := "copy f1 to f2"
enq := taskq.Enque{Task: taskq.Task{Queue: q, Body: []byte(b)}}
@soheilhy
soheilhy / queue.go
Last active August 29, 2015 14:25
EnQHTTPHandler
type httpHandler struct {
Hive beehive.Hive // Hive represents the hive our handler is registered on.
}
// EnQHTTPHandler provides the HTTP endpoint for enqueuing tasks.
type EnQHTTPHandler httpHandler
func (h *EnQHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
q, ok := mux.Vars(r)["queue"]
if !ok {