Created
February 17, 2022 05:55
-
-
Save Reasno/f4f50f0f9cfa0a03189d5b91f4dcd114 to your computer and use it in GitHub Desktop.
Renewable distributed lock implementation with Redis (untested)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package core_locks | |
import ( | |
"container/heap" | |
"context" | |
"errors" | |
"fmt" | |
"time" | |
"github.com/go-redis/redis/v8" | |
) | |
// Lua script to unlock a redis lock | |
const unlockLuaScript = ` | |
if redis.call("get", KEYS[1]) == ARGV[1] then | |
return redis.call("del", KEYS[1]) | |
else | |
return 0 | |
end | |
` | |
// Lua script to renew redis lock | |
const renewLuaScript = ` | |
if redis.call("get", KEYS[1]) == ARGV[1] then | |
return redis.call("pexpire", KEYS[1], ARGV[2]) | |
else | |
return 0 | |
end | |
` | |
type lockRenewalElement struct { | |
lockKey string | |
lockValue interface{} | |
ttl time.Duration | |
nextRenewal time.Time | |
} | |
type lockRenewalHeap []*lockRenewalElement | |
func (l *lockRenewalHeap) Len() int { | |
return len(*l) | |
} | |
func (l *lockRenewalHeap) Less(i, j int) bool { | |
return (*l)[i].nextRenewal.Before((*l)[j].nextRenewal) | |
} | |
func (l *lockRenewalHeap) Swap(i, j int) { | |
(*l)[i], (*l)[j] = (*l)[j], (*l)[i] | |
} | |
func (l *lockRenewalHeap) Push(x interface{}) { | |
*l = append(*l, x.(*lockRenewalElement)) | |
} | |
func (l *lockRenewalHeap) Pop() interface{} { | |
old := *l | |
n := len(old) | |
x := old[n-1] | |
*l = old[0 : n-1] | |
return x | |
} | |
func (l *lockRenewalHeap) Peak() *lockRenewalElement { | |
return (*l)[len(*l)-1] | |
} | |
type LockManager struct { | |
ch chan func() | |
client redis.UniversalClient | |
prefix string | |
locks map[string]int64 | |
renewalHeap lockRenewalHeap | |
leaseTTL time.Duration | |
} | |
func NewLockManager(redisClient redis.UniversalClient) *LockManager { | |
// TODO: make ttl, prefix configurable | |
return &LockManager{ | |
ch: make(chan func()), | |
client: redisClient, | |
prefix: "lock:", | |
locks: make(map[string]int64), | |
leaseTTL: time.Second, | |
} | |
} | |
func (l *LockManager) Lock(ctx context.Context, key string) (unlock func() error, err error) { | |
resultCh := make(chan error) | |
lockKey := l.prefix + key | |
lockValue := time.Now().UnixNano() | |
l.ch <- func() { | |
if _, ok := l.locks[lockKey]; ok { | |
resultCh <- errors.New("lock already held") | |
} | |
err = l.client.SetNX(ctx, lockKey, lockValue, l.leaseTTL).Err() | |
if err != nil { | |
resultCh <- err | |
} | |
l.locks[lockKey] = lockValue | |
heap.Push(&l.renewalHeap, &lockRenewalElement{ | |
lockKey: lockKey, | |
lockValue: lockValue, | |
ttl: l.leaseTTL, | |
nextRenewal: time.Now().Add(l.leaseTTL / 2), | |
}) | |
resultCh <- nil | |
} | |
err = <-resultCh | |
if err != nil { | |
return nil, err | |
} | |
return func() error { | |
return l.unlock(ctx, lockKey, lockValue) | |
}, nil | |
} | |
func (l *LockManager) unlock(ctx context.Context, key string, value interface{}) error { | |
resultCh := make(chan error) | |
l.ch <- func() { | |
if _, ok := l.locks[key]; !ok { | |
resultCh <- errors.New("lock not held") | |
} | |
err := l.client.Eval(ctx, unlockLuaScript, []string{key}, value).Err() | |
if err != nil { | |
resultCh <- fmt.Errorf("failed to release lock: %w", err) | |
} | |
delete(l.locks, key) | |
resultCh <- nil | |
} | |
return <-resultCh | |
} | |
// Run LockManager in a dedicated goroutine. | |
func (l *LockManager) Run(ctx context.Context) error { | |
timerCh := make(<-chan time.Time) | |
for { | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case f := <-l.ch: | |
f() | |
timerCh = time.After(0) | |
case now := <-timerCh: | |
for { | |
nextRenewal := l.renewalHeap.Peak() | |
if nextRenewal == nil { | |
break | |
} | |
if nextRenewal.nextRenewal.After(now) { | |
timerCh = time.After(nextRenewal.nextRenewal.Sub(now)) | |
break | |
} | |
// check if lock still exist | |
lockKey := nextRenewal.lockKey | |
if lockValue, ok := l.locks[lockKey]; !ok || lockValue != nextRenewal.lockValue { | |
heap.Pop(&l.renewalHeap) | |
continue | |
} | |
// renew lock | |
result, err := l.client.Eval(ctx, renewLuaScript, []string{lockKey}, nextRenewal.lockValue, nextRenewal.ttl.Milliseconds()).Int() | |
if err != nil { | |
return fmt.Errorf("failed to renew lock %s: %w", lockKey, err) | |
} | |
if result == 0 { | |
heap.Pop(&l.renewalHeap) | |
continue | |
} | |
nextRenewal.nextRenewal = nextRenewal.nextRenewal.Add(nextRenewal.ttl / 2) | |
heap.Fix(&l.renewalHeap, l.renewalHeap.Len()-1) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment