Skip to content

Instantly share code, notes, and snippets.

@panzerdev
Last active March 16, 2019 18:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save panzerdev/cf8278b346770814088c006643f9bfd1 to your computer and use it in GitHub Desktop.
Save panzerdev/cf8278b346770814088c006643f9bfd1 to your computer and use it in GitHub Desktop.
Topic based Worker with Redis example
package main
import (
"fmt"
"github.com/go-redis/redis"
"log"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
rc := redis.NewClient(&redis.Options{
Addr: "192.168.2.22:6379",
})
res, err := rc.Ping().Result()
if err != nil {
log.Fatal(err)
}
defer rc.Close()
log.Println("Ping success", res)
redisClient := RedisClient{
rc,
}
topic1 := "work_1"
go func() {
var j int
for {
var payload []string
for i := 0; i < 10; i++ {
payload = append(payload, fmt.Sprintf("Batch %v Job %v", j, i))
}
j++
rc.LPush(topic1, payload)
time.Sleep(5 * time.Second)
}
}()
taskConsumer := NewConsumer(redisClient.TaskProvider, 10)
go taskConsumer.StartConsuming(topic1, func(data []byte) bool {
log.Println("Consume:", string(data))
time.Sleep(time.Second)
return rand.Int()%2 == 0
})
stopC := make(chan os.Signal)
signal.Notify(stopC, syscall.SIGINT)
sig := <-stopC
log.Println("Signal received", sig)
taskConsumer.GracefulShutdown()
}
type RedisClient struct {
*redis.Client
}
func (rc *RedisClient) TaskProvider(arrayKey string) ([]byte, func(), error) {
for {
b, err := rc.LPop(arrayKey).Bytes()
if err != nil || len(b) == 0 {
time.Sleep(time.Second)
continue
}
failureFunc := func() {
rc.LPush(arrayKey, b)
}
return b, failureFunc, nil
}
}
package main
import (
"context"
"log"
"sync"
)
type Provider func(topic string) ([]byte, func(), error)
type Consumer func([]byte) bool
type TaskConsumer struct {
prv Provider
wg *sync.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
sem chan int
}
func NewConsumer(provider Provider, workerLimit int) *TaskConsumer {
ctx, done := context.WithCancel(context.Background())
c := &TaskConsumer{
prv: provider,
wg: &sync.WaitGroup{},
ctx: ctx,
cancelFunc: done,
sem: make(chan int, workerLimit),
}
return c
}
func (c *TaskConsumer) StartConsuming(topic string, consumer Consumer) {
doneFunc := func() {
<-c.sem
c.wg.Done()
}
for {
data, failed, err := c.prv(topic)
if err != nil {
log.Println(err)
continue
}
select {
case c.sem <- 1:
c.wg.Add(1)
go doJob(doneFunc, consumer, failed, data)
case <-c.ctx.Done():
log.Printf("Consuming for topic %v canceled\n", topic)
failed()
return
}
}
}
func (c *TaskConsumer) GracefulShutdown() {
log.Println("Gracefully shut down start")
c.cancelFunc()
c.wg.Wait()
log.Println("Gracefully shut down end")
}
func doJob(done func(), consumer Consumer, failed func(), data []byte) {
defer done()
success := consumer(data)
if !success {
log.Println("Job failed", string(data))
failed()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment