Skip to content

Instantly share code, notes, and snippets.

@isRuslan
Last active February 6, 2017 20:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save isRuslan/ea2a188130da041edd88fab47f501288 to your computer and use it in GitHub Desktop.
Save isRuslan/ea2a188130da041edd88fab47f501288 to your computer and use it in GitHub Desktop.
Destributed redis lock in golang

Генератор/читататель

Описание

Задача написать приложение, работающее с redis, умеющее как генерировать сообщения, так и обрабатывать. Параллельно может быть запущено сколько угодно приложений.

Обмен любой информацией между приложениями осуществлять только через redis.

Все запущенные копии приложения кроме генератора, являются обработчиками сообщений и в любой момент готовы получить сообщение из redis.

Все сообщения должны быть обработаны, причём только один раз, только одним из обработчиков.

Генератором должно быть только одно из запущенных приложений. Т.е. каждое приложение может стать генератором. Но в любой момент времени может работать только один генератор.

Если текущий генератор завершить принудительно (обработчик завершения запрещен, например, выключили из розетки), то одно из приложений должно заменить завершённое (упавшее) и стать генератором. Для определения кто генератор нельзя использовать средства ОС, считается что все приложения запущенны на разных серверах и общаются только через redis. Сообщения генерируются раз в 500 мс.

Для генерации сообщения использовать любую функцию с рандомным текстовым ответом.

Приложение, получая сообщение, с вероятностью 5% определяет, что сообщение содержит ошибку.

Если сообщение содержит ошибку, то сообщение следует поместить в redis для дальнейшего изучения.

Если приложение запустить с параметром 'getErrors', то оно заберет из redis все сообщения с ошибкой, выведет их на экран и завершится, при этом из базы сообщения удаляются.

Проверить что приложение может обработать 1000000 сообщений (интервал 500мс можно убрать для теста)

Запуск

go run main.go # запуск приложения (генератор/читатель)
go run main.go --getErrors # получение ошибок

Алгоритм

На каждом запуске приложение пытается выставить лок на взятие позиции мастера (SETNX masterKey): если получилось, то играем в мастера, если нет то в читателя. Мастер запускает 2 интервальных канала в которых генерирет сообщения и продлевает свой лок на expireTime / 2. Читатель так же запускает 2 интервальных канала в которых читает сообщения и пытается получить свой лок на позицию мастера с итнтервалом expireTime.

Для подключения к редису используется пул соеденений, который позволят обрабатывает ситуация потери соеденений и его переподключения, при удержении 1 соединения к редису и его разрыве переподключения не происходит.

Если произошла задержка сети, или соедение к базе отвалилось, то возможна ситуация когда текущий мастер потерял свою позицию (произошел EXPIRE + какой-то читатель успел занять позицию) и тогда в системе будет > 1 мастера. Для этого случая переде отправкой сообщения происходит проверка того, что именно текущий мастер дрежит лок на позицию.

package main
import (
"flag"
"github.com/garyburd/redigo/redis"
"log"
"math/rand"
"os"
"time"
)
const (
masterKey = "master_key"
masterExpire = 2000
messageQueue = "messages"
errorQueue = "errors"
)
type role string
const (
SLAVE role = "slave"
MASTER role = "master"
)
var getErrors = flag.Bool("getErrors", false, "show error list")
func main() {
app := Application{}
app.init()
}
type Application struct {
pool *redis.Pool
id int
role role
}
func (app *Application) init() {
app.pool = newRedisPool(":6379")
app.id = os.Getpid()
if flag.Parse(); *getErrors {
app.getErrors()
return
}
app.election()
}
func (app Application) election() {
conn := app.pool.Get()
defer conn.Close()
current, err := conn.Do("SET", masterKey, app.id, "NX", "PX", masterExpire)
if err != nil {
log.Printf("TRY ERROR %v", err)
return
}
log.Printf("ELECTION id=%v success=%v ", app.id, current)
if current == "OK" {
app.beMaster()
} else {
app.beSlave()
}
}
func (app Application) getErrors() {
conn := app.pool.Get()
defer conn.Close()
conn.Send("MULTI")
conn.Send("LRANGE", errorQueue, 0, -1)
conn.Send("DEL", errorQueue)
reply, err := redis.Values(conn.Do("EXEC"))
if err != nil {
log.Printf("GET ERRORS %v", err)
return
}
messages, err := redis.Strings(reply[0], nil)
if err != nil {
log.Printf("STRINGS ERROR %v", err)
return
}
if len(messages) == 0 {
log.Print("EMPTY ERRORS")
return
}
for i := 0; i < len(messages); i++ {
log.Printf("ERROR MESSAGE %v", messages[i])
}
}
func (app *Application) beMaster() {
log.Print("start master")
if app.role == MASTER {
return
}
app.role = MASTER
genChan := time.NewTicker(time.Millisecond * 200).C
expireChan := time.NewTicker(time.Millisecond * masterExpire / 2).C
for {
select {
case <-genChan:
app.genMessage()
case <-expireChan:
app.expireMaster()
}
}
}
func (app Application) genMessage() {
conn := app.pool.Get()
defer conn.Close()
currentId, err := redis.Int(conn.Do("GET", masterKey))
if err != nil {
log.Printf("CHECK_MASTER ERROR %v", err)
app.election()
return
}
if currentId != app.id {
log.Printf("INVALID MASTER current=%d id=%d", currentId, app.id)
app.beSlave()
return
}
message := random()
if _, err := conn.Do("RPUSH", messageQueue, message); err != nil {
log.Printf("PUSH ERROR %v", err)
return
}
log.Printf("SENT MESSAGE %v", message)
}
func (app Application) expireMaster() {
conn := app.pool.Get()
defer conn.Close()
if _, err := conn.Do("PEXPIRE", masterKey, masterExpire); err != nil {
log.Printf("EXPIRE ERROR %v", err)
}
}
func (app *Application) beSlave() {
if app.role == SLAVE {
return
}
log.Print("start slave")
app.role = SLAVE
readChan := time.NewTicker(time.Millisecond * 500).C
electionChan := time.NewTicker(time.Millisecond * masterExpire).C
for {
select {
case <-readChan:
app.readMessage()
case <-electionChan:
app.election()
}
}
}
func (app Application) readMessage() {
conn := app.pool.Get()
defer conn.Close()
reply, err := redis.Values(conn.Do("BLPOP", messageQueue, masterExpire/2))
if err != nil {
log.Printf("BLPOP SEND ERROR %v", err)
return
}
msg, _ := redis.String(reply[1], nil)
log.Printf("GOT MESSAGE %v", msg)
if rand.Intn(100) < 5 {
log.Printf("PUSH ERROR %v", msg)
if _, err := conn.Do("RPUSH", errorQueue, msg); err != nil {
log.Printf("ERROR PUSH %v", err)
return
}
}
}
func newRedisPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 5,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", addr)
},
Wait: true,
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func random() string {
b := make([]byte, 8)
for i := range b {
b[i] = byte(rand.Intn(26) + 61)
}
return string(b)
}
@isRuslan
Copy link
Author

isRuslan commented Feb 6, 2017

Замечания:

  • readMessage() происходит каждые 500мс, если скорость генерации сообщений будет чаще чем
    1 сообщение в 500мс * кол-во хостов слушателей, то всё, приехали. Очередь в редисе будет расти.
    высокую нагрузку не выдержит такая схема.
  • genMessage() на каждое отправленное сообщение происходит дополнительный conn.Do("GET", masterKey))
    что является большим оверхедом.
  • election() error и init() error - не понятно зачем возвращаемое значение, не используется нигде
  • return бывает в конце функций где он не нужен
  • func createPool(addr string) *redis.Pool { return &redis.Pool{....
    в идеоматичном go это то бы выглядило так: func newRedisPool(addr string) *redis.Pool {
  • мало используются типы, например master или slave могло бы быть типом.
  • не испольуется goroutine. не задействованы все cpu, а если запустить несколько приложений по количеству cpu,
    то будет увеличинна нагрузка на redis в cpu_N раз.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment