Topic based Worker with Redis example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"github.com/go-redis/redis" | |
"log" | |
"math/rand" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
) | |
func main() { | |
rc := redis.NewClient(&redis.Options{ | |
Addr: "192.168.2.22:6379", | |
}) | |
res, err := rc.Ping().Result() | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer rc.Close() | |
log.Println("Ping success", res) | |
redisClient := RedisClient{ | |
rc, | |
} | |
topic1 := "work_1" | |
go func() { | |
var j int | |
for { | |
var payload []string | |
for i := 0; i < 10; i++ { | |
payload = append(payload, fmt.Sprintf("Batch %v Job %v", j, i)) | |
} | |
j++ | |
rc.LPush(topic1, payload) | |
time.Sleep(5 * time.Second) | |
} | |
}() | |
taskConsumer := NewConsumer(redisClient.TaskProvider, 10) | |
go taskConsumer.StartConsuming(topic1, func(data []byte) bool { | |
log.Println("Consume:", string(data)) | |
time.Sleep(time.Second) | |
return rand.Int()%2 == 0 | |
}) | |
stopC := make(chan os.Signal) | |
signal.Notify(stopC, syscall.SIGINT) | |
sig := <-stopC | |
log.Println("Signal received", sig) | |
taskConsumer.GracefulShutdown() | |
} | |
type RedisClient struct { | |
*redis.Client | |
} | |
func (rc *RedisClient) TaskProvider(arrayKey string) ([]byte, func(), error) { | |
for { | |
b, err := rc.LPop(arrayKey).Bytes() | |
if err != nil || len(b) == 0 { | |
time.Sleep(time.Second) | |
continue | |
} | |
failureFunc := func() { | |
rc.LPush(arrayKey, b) | |
} | |
return b, failureFunc, nil | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"log" | |
"sync" | |
) | |
type Provider func(topic string) ([]byte, func(), error) | |
type Consumer func([]byte) bool | |
type TaskConsumer struct { | |
prv Provider | |
wg *sync.WaitGroup | |
ctx context.Context | |
cancelFunc context.CancelFunc | |
sem chan int | |
} | |
func NewConsumer(provider Provider, workerLimit int) *TaskConsumer { | |
ctx, done := context.WithCancel(context.Background()) | |
c := &TaskConsumer{ | |
prv: provider, | |
wg: &sync.WaitGroup{}, | |
ctx: ctx, | |
cancelFunc: done, | |
sem: make(chan int, workerLimit), | |
} | |
return c | |
} | |
func (c *TaskConsumer) StartConsuming(topic string, consumer Consumer) { | |
doneFunc := func() { | |
<-c.sem | |
c.wg.Done() | |
} | |
for { | |
data, failed, err := c.prv(topic) | |
if err != nil { | |
log.Println(err) | |
continue | |
} | |
select { | |
case c.sem <- 1: | |
c.wg.Add(1) | |
go doJob(doneFunc, consumer, failed, data) | |
case <-c.ctx.Done(): | |
log.Printf("Consuming for topic %v canceled\n", topic) | |
failed() | |
return | |
} | |
} | |
} | |
func (c *TaskConsumer) GracefulShutdown() { | |
log.Println("Gracefully shut down start") | |
c.cancelFunc() | |
c.wg.Wait() | |
log.Println("Gracefully shut down end") | |
} | |
func doJob(done func(), consumer Consumer, failed func(), data []byte) { | |
defer done() | |
success := consumer(data) | |
if !success { | |
log.Println("Job failed", string(data)) | |
failed() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment