Skip to content

Instantly share code, notes, and snippets.

@Reasno
Created February 17, 2022 05:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Reasno/f4f50f0f9cfa0a03189d5b91f4dcd114 to your computer and use it in GitHub Desktop.
Save Reasno/f4f50f0f9cfa0a03189d5b91f4dcd114 to your computer and use it in GitHub Desktop.
Renewable distributed lock implementation with Redis (untested)
package core_locks
import (
"container/heap"
"context"
"errors"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
// Lua script to unlock a redis lock
const unlockLuaScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
// Lua script to renew redis lock
const renewLuaScript = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("pexpire", KEYS[1], ARGV[2])
else
return 0
end
`
type lockRenewalElement struct {
lockKey string
lockValue interface{}
ttl time.Duration
nextRenewal time.Time
}
type lockRenewalHeap []*lockRenewalElement
func (l *lockRenewalHeap) Len() int {
return len(*l)
}
func (l *lockRenewalHeap) Less(i, j int) bool {
return (*l)[i].nextRenewal.Before((*l)[j].nextRenewal)
}
func (l *lockRenewalHeap) Swap(i, j int) {
(*l)[i], (*l)[j] = (*l)[j], (*l)[i]
}
func (l *lockRenewalHeap) Push(x interface{}) {
*l = append(*l, x.(*lockRenewalElement))
}
func (l *lockRenewalHeap) Pop() interface{} {
old := *l
n := len(old)
x := old[n-1]
*l = old[0 : n-1]
return x
}
func (l *lockRenewalHeap) Peak() *lockRenewalElement {
return (*l)[len(*l)-1]
}
type LockManager struct {
ch chan func()
client redis.UniversalClient
prefix string
locks map[string]int64
renewalHeap lockRenewalHeap
leaseTTL time.Duration
}
func NewLockManager(redisClient redis.UniversalClient) *LockManager {
// TODO: make ttl, prefix configurable
return &LockManager{
ch: make(chan func()),
client: redisClient,
prefix: "lock:",
locks: make(map[string]int64),
leaseTTL: time.Second,
}
}
func (l *LockManager) Lock(ctx context.Context, key string) (unlock func() error, err error) {
resultCh := make(chan error)
lockKey := l.prefix + key
lockValue := time.Now().UnixNano()
l.ch <- func() {
if _, ok := l.locks[lockKey]; ok {
resultCh <- errors.New("lock already held")
}
err = l.client.SetNX(ctx, lockKey, lockValue, l.leaseTTL).Err()
if err != nil {
resultCh <- err
}
l.locks[lockKey] = lockValue
heap.Push(&l.renewalHeap, &lockRenewalElement{
lockKey: lockKey,
lockValue: lockValue,
ttl: l.leaseTTL,
nextRenewal: time.Now().Add(l.leaseTTL / 2),
})
resultCh <- nil
}
err = <-resultCh
if err != nil {
return nil, err
}
return func() error {
return l.unlock(ctx, lockKey, lockValue)
}, nil
}
func (l *LockManager) unlock(ctx context.Context, key string, value interface{}) error {
resultCh := make(chan error)
l.ch <- func() {
if _, ok := l.locks[key]; !ok {
resultCh <- errors.New("lock not held")
}
err := l.client.Eval(ctx, unlockLuaScript, []string{key}, value).Err()
if err != nil {
resultCh <- fmt.Errorf("failed to release lock: %w", err)
}
delete(l.locks, key)
resultCh <- nil
}
return <-resultCh
}
// Run LockManager in a dedicated goroutine.
func (l *LockManager) Run(ctx context.Context) error {
timerCh := make(<-chan time.Time)
for {
select {
case <-ctx.Done():
return ctx.Err()
case f := <-l.ch:
f()
timerCh = time.After(0)
case now := <-timerCh:
for {
nextRenewal := l.renewalHeap.Peak()
if nextRenewal == nil {
break
}
if nextRenewal.nextRenewal.After(now) {
timerCh = time.After(nextRenewal.nextRenewal.Sub(now))
break
}
// check if lock still exist
lockKey := nextRenewal.lockKey
if lockValue, ok := l.locks[lockKey]; !ok || lockValue != nextRenewal.lockValue {
heap.Pop(&l.renewalHeap)
continue
}
// renew lock
result, err := l.client.Eval(ctx, renewLuaScript, []string{lockKey}, nextRenewal.lockValue, nextRenewal.ttl.Milliseconds()).Int()
if err != nil {
return fmt.Errorf("failed to renew lock %s: %w", lockKey, err)
}
if result == 0 {
heap.Pop(&l.renewalHeap)
continue
}
nextRenewal.nextRenewal = nextRenewal.nextRenewal.Add(nextRenewal.ttl / 2)
heap.Fix(&l.renewalHeap, l.renewalHeap.Len()-1)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment