Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@vmihailenco
Created January 10, 2020 12:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vmihailenco/7b71c03d5b5bfaa1df21dd9693c988ed to your computer and use it in GitHub Desktop.
Save vmihailenco/7b71c03d5b5bfaa1df21dd9693c988ed to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"io/ioutil"
"testing"
"github.com/klauspost/compress/zstd"
"github.com/valyala/gozstd"
)
func BenchmarkZstd(b *testing.B) {
var buf bytes.Buffer
enc := gozstd.NewWriter(&buf)
dec := gozstd.NewReader(&buf)
for i := 0; i < b.N; i++ {
buf.Reset()
enc.Reset(&buf, nil, gozstd.DefaultCompressionLevel)
if _, err := enc.Write(bin); err != nil {
b.Fatal(err)
}
if err := enc.Flush(); err != nil {
b.Fatal(err)
}
dec.Reset(&buf, nil)
bb, err := ioutil.ReadAll(dec)
if err != nil {
b.Fatal(err)
}
if !bytes.Equal(bb, bin) {
b.Fatal("not reached")
}
}
}
func BenchmarkGoZstd(b *testing.B) {
var buf bytes.Buffer
enc, err := zstd.NewWriter(&buf)
if err != nil {
b.Fatal(err)
}
dec, err := zstd.NewReader(&buf)
if err != nil {
b.Fatal(err)
}
for i := 0; i < b.N; i++ {
buf.Reset()
enc.Reset(&buf)
if _, err := enc.Write(bin); err != nil {
b.Fatal(err)
}
if err := enc.Close(); err != nil {
b.Fatal(err)
}
if err := dec.Reset(&buf); err != nil {
b.Fatal(err)
}
bb, err := ioutil.ReadAll(dec)
if err != nil {
b.Fatal(err)
}
if !bytes.Equal(bb, bin) {
b.Fatal("not reached")
}
}
}
var bin = []byte(`Skip to content
Search or jump to…
Pull requests
Issues
Marketplace
Explore
@vmihailenco
vmihailenco
/
taskq
15
29320
Code Issues 6 Pull requests 0 Actions Projects 0 Wiki Security Insights Settings
Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends https://godoc.org/github.com/vmihaile…
go
golang
queue
message-queue
ironmq
sqs
redis
task-queue
taskqueue
ironmq-backend
339 commits
1 branch
0 packages
67 releases
3 contributors
BSD-2-Clause
@vmihailenco
vmihailenco Fix compat with recent msgpack
Latest commit
4e2a924
24 days ago
Type Name Latest commit message Commit time
azsqs Use Message.TaskName when checking message uniqueness 5 months ago
examples Use go-redis v7 5 months ago
internal Use Hash64 with seed 3 months ago
ironmq Use Message.TaskName when checking message uniqueness 5 months ago
memqueue Use Message.TaskName when checking message uniqueness 5 months ago
redisq Fix build 3 months ago
.golangci.yml Rework API 6 months ago
.travis.yml Move scheduling from Consumer back to memqueue 5 months ago
LICENSE Add license 3 years ago
Makefile Move scheduling from Consumer back to memqueue 5 months ago
README.md readme: cleanup 5 months ago
azsqs_test.go go mod v2 6 months ago
bench_test.go Require context when constructing a Message 6 months ago
consumer.go Fix idle fetcher and idle worker removal 3 months ago
consumer_test.go Use go-redis v7 5 months ago
doc.go Add examples and update readme 7 months ago
example_ratelimit_test.go Require context when constructing a Message 6 months ago
example_test.go Replace Task.OnceWithArgs with Task.WithArgs.OnceInPeriod 6 months ago
go.mod Use Hash64 with seed 3 months ago
go.sum Use Hash64 with seed 3 months ago
handler.go Use semantic version in import path 5 months ago
ironmq_test.go go mod v2 6 months ago
message.go Fix compat with recent msgpack 24 days ago
queue.go Use go-redis v7 5 months ago
redisq_test.go go mod v2 6 months ago
registry.go Replace TaskRegistry with Handler 5 months ago
task.go Use Message.TaskName when checking message uniqueness 5 months ago
taskq.go Use go-redis v7 5 months ago
README.md
Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends
Build Status GoDoc
Installation
taskq requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing taskq:
go get github.com/vmihailenco/taskq/v2
Features
Redis, SQS, IronMQ, and in-memory backends.
Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
Global rate limiting.
Global limit of workers.
Call once - deduplicating messages with same name.
Automatic retries with exponential backoffs.
Automatic pausing when all messages in queue fail.
Fallback handler for processing failed messages.
Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
Automatic message compression using zstd.
Quickstart
I recommend that you split your app into two parts:
An API that accepts requests from customers and adds tasks to the queues.
A Worker that fetches tasks from the queues and processes them.
This way you can:
Isolate API and worker from each other;
Scale API and worker separately;
Have different configs for API and worker (like timeouts).
There is an api_worker example that demonstrates this approach using Redis as backend:
cd examples/api_worker
go run worker/main.go
go run api/main.go
You start by choosing backend to use - in our case Redis:
package api_worker
var QueueFactory = redisq.NewFactory()
Using that factory you create queue that contains task(s):
var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
Name: "api-worker",
Redis: Redis, // go-redis client
})
Using the queue you create task with handler that does some useful work:
var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
Name: "counter",
Handler: func() error {
IncrLocalCounter()
return nil
},
})
Then in API you use the task to add messages/jobs to the queues:
ctx := context.Background()
for {
// call task handler without any args
err := api_worker.MainQueue.Add(api_worker.CountTask.WithArgs(ctx))
if err != nil {
log.Fatal(err)
}
}
And in worker you start processing the queue:
err := api_worker.MainQueue.Start(context.Background())
if err != nil {
log.Fatal(err)
}
API overview
t := myQueue.RegisterTask(&taskq.TaskOptions{
Name: "greeting",
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Say "Hello World".
err := myQueue.Add(t.WithArgs(context.Background(), "World"))
if err != nil {
panic(err)
}
// Say "Hello World" with 1 hour delay.
msg := t.WithArgs(ctx, "World")
msg.Delay = time.Hour
_ = myQueue.Add(msg)
// Say "Hello World" once.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World")
msg.Name = "hello-world" // unique
_ = myQueue.Add(msg)
}
// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World")
msg.Name = "hello-world"
msg.Delay = time.Hour
_ = myQueue.Add(msg)
}
// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour)
_ = myQueue.Add(msg)
}
// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour, "World", "europe")
_ = myQueue.Add(msg)
}
Message deduplication
If a Message has a Name then this will be used as unique identifier and messages with the same name will be deduplicated (i.e. not processed again) within a 24 hour period (or possibly longer if not evicted from local cache after that period). Where Name is omitted then non deduplication occurs and each message will be processed. Task's WithMessage and WithArgs both produces messages with no Name so will not be deduplicated. OnceWithArgs sets a name based off a consistent hash of the arguments and a quantised period of time (i.e. 'this hour', 'today') passed to OnceWithArgs a period. This guarantees that the same function will not be called with the same arguments during period.
Handlers
A Handler and FallbackHandler are supplied to RegisterTask in the TaskOptions.
There are three permitted types of signature:
A zero-argument function
A function whose arguments are assignable in type from those which are passed in the message
A function which takes a single *Message argument
If a task is registered with a handler that takes a Go context.Context as its first argument then when that handler is invoked it will be passed the same Context that was passed to Consumer.Start(ctx). This can be used to transmit a signal to abort to all tasks being processed:
var AbortableTask = MainQueue.RegisterTask(&taskq.TaskOptions{
Name: "SomethingLongwinded",
Handler: func(ctx context.Context) error {
for range time.Tick(time.Second) {
select {
case <-ctx.Done():
return ctx.Err()
default:
fmt.Println("Wee!")
}
}
return nil
},
})
Custom message delay
If error returned by handler implements Delay() time.Duration interface then that delay is used to postpone message processing.
type RateLimitError string
func (e RateLimitError) Error() string {
return string(e)
}
func (RateLimitError) Delay() time.Duration {
return time.Hour
}
func handler() error {
return RateLimitError("calm down")
}
© 2020 GitHub, Inc.
Terms
Privacy
Security
Status
Help
Contact GitHub
Pricing
API
Training
Blog
About
`)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment