Skip to content

Instantly share code, notes, and snippets.

@panzerdev
Last active October 24, 2018 13:42
Show Gist options
  • Save panzerdev/34eefbb14f616c0948240638ceeee9ec to your computer and use it in GitHub Desktop.
Save panzerdev/34eefbb14f616c0948240638ceeee9ec to your computer and use it in GitHub Desktop.
Redis based worker
package main
import (
"context"
"github.com/go-redis/redis"
"sync"
"time"
)
type RedisConsumer struct {
rc redis.Cmdable
wg *sync.WaitGroup
ctx context.Context
cancelFunc context.CancelFunc
sem chan int
}
type PushJobber interface {
HandleMsg(data []byte) bool
}
func NewRedisConsumer(redisClient redis.Cmdable, workerLimit int) *RedisConsumer {
ctx, done := context.WithCancel(context.Background())
c := &RedisConsumer{
rc: redisClient,
wg: &sync.WaitGroup{},
ctx: ctx,
cancelFunc: done,
sem: make(chan int, workerLimit),
}
return c
}
func (c *RedisConsumer) StartConsuming(arrayKey string, consumer PushJobber) {
for {
data := c.nextItem(arrayKey)
select {
case c.sem <- 1:
c.wg.Add(1)
go doJob(c.wg, c.sem, consumer, data)
case <-c.ctx.Done():
c.rc.LPush(arrayKey, data)
return
}
}
}
func (c *RedisConsumer) GracefullShutdown() {
c.cancelFunc()
c.wg.Wait()
}
func (c *RedisConsumer) nextItem(arrayKey string) []byte {
for {
b, err := c.rc.LPop(arrayKey).Bytes()
if err != nil || len(b) == 0 {
time.Sleep(time.Second)
continue
}
return b
}
}
func doJob(wg *sync.WaitGroup, sem <-chan int, consumer PushJobber, data []byte) {
defer func() {
<-sem
wg.Done()
}()
consumer.HandleMsg(data)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment