Topic based Worker with Redis example
package main | |
import ( | |
"fmt" | |
"github.com/go-redis/redis" | |
"log" | |
"math/rand" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
) | |
func main() { | |
rc := redis.NewClient(&redis.Options{ | |
Addr: "192.168.2.22:6379", | |
}) | |
res, err := rc.Ping().Result() | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer rc.Close() | |
log.Println("Ping success", res) | |
redisClient := RedisClient{ | |
rc, | |
} | |
topic1 := "work_1" | |
go func() { | |
var j int | |
for { | |
var payload []string | |
for i := 0; i < 10; i++ { | |
payload = append(payload, fmt.Sprintf("Batch %v Job %v", j, i)) | |
} | |
j++ | |
rc.LPush(topic1, payload) | |
time.Sleep(5 * time.Second) | |
} | |
}() | |
taskConsumer := NewConsumer(redisClient.TaskProvider, 10) | |
go taskConsumer.StartConsuming(topic1, func(data []byte) bool { | |
log.Println("Consume:", string(data)) | |
time.Sleep(time.Second) | |
return rand.Int()%2 == 0 | |
}) | |
stopC := make(chan os.Signal) | |
signal.Notify(stopC, syscall.SIGINT) | |
sig := <-stopC | |
log.Println("Signal received", sig) | |
taskConsumer.GracefulShutdown() | |
} | |
type RedisClient struct { | |
*redis.Client | |
} | |
func (rc *RedisClient) TaskProvider(arrayKey string) ([]byte, func(), error) { | |
for { | |
b, err := rc.LPop(arrayKey).Bytes() | |
if err != nil || len(b) == 0 { | |
time.Sleep(time.Second) | |
continue | |
} | |
failureFunc := func() { | |
rc.LPush(arrayKey, b) | |
} | |
return b, failureFunc, nil | |
} | |
} |
package main | |
import ( | |
"context" | |
"log" | |
"sync" | |
) | |
type Provider func(topic string) ([]byte, func(), error) | |
type Consumer func([]byte) bool | |
type TaskConsumer struct { | |
prv Provider | |
wg *sync.WaitGroup | |
ctx context.Context | |
cancelFunc context.CancelFunc | |
sem chan int | |
} | |
func NewConsumer(provider Provider, workerLimit int) *TaskConsumer { | |
ctx, done := context.WithCancel(context.Background()) | |
c := &TaskConsumer{ | |
prv: provider, | |
wg: &sync.WaitGroup{}, | |
ctx: ctx, | |
cancelFunc: done, | |
sem: make(chan int, workerLimit), | |
} | |
return c | |
} | |
func (c *TaskConsumer) StartConsuming(topic string, consumer Consumer) { | |
doneFunc := func() { | |
<-c.sem | |
c.wg.Done() | |
} | |
for { | |
data, failed, err := c.prv(topic) | |
if err != nil { | |
log.Println(err) | |
continue | |
} | |
select { | |
case c.sem <- 1: | |
c.wg.Add(1) | |
go doJob(doneFunc, consumer, failed, data) | |
case <-c.ctx.Done(): | |
log.Printf("Consuming for topic %v canceled\n", topic) | |
failed() | |
return | |
} | |
} | |
} | |
func (c *TaskConsumer) GracefulShutdown() { | |
log.Println("Gracefully shut down start") | |
c.cancelFunc() | |
c.wg.Wait() | |
log.Println("Gracefully shut down end") | |
} | |
func doJob(done func(), consumer Consumer, failed func(), data []byte) { | |
defer done() | |
success := consumer(data) | |
if !success { | |
log.Println("Job failed", string(data)) | |
failed() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment