Skip to content

Instantly share code, notes, and snippets.

@c4pt0r
Created March 19, 2017 11:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save c4pt0r/767557745dad2b3defbe7ad84d111986 to your computer and use it in GitHub Desktop.
Save c4pt0r/767557745dad2b3defbe7ad84d111986 to your computer and use it in GitHub Desktop.
simple distributed lock using TiKV
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"math/rand"
"time"
"github.com/ngaut/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/terror"
"github.com/twinj/uuid"
"net/http"
_ "net/http/pprof"
)
const (
leaseInterval time.Duration = 3 * time.Second
)
var pdAddr = flag.String("pd", "localhost:2379", "")
var (
ErrAlreadyLocked error = errors.New("already locked")
)
type LockMeta struct {
ID string `json:"uuid"`
LastUpdated time.Time `json:"last_updated"`
}
type DLock struct {
name string
s kv.Storage
unlockSignal chan struct{}
LockMeta
}
func init() {
rand.Seed(time.Now().Unix())
}
func NewLock(s kv.Storage, name string) *DLock {
return &DLock{
name: name,
s: s,
unlockSignal: make(chan struct{}),
LockMeta: LockMeta{
ID: uuid.NewV4().String(),
},
}
}
func (l *DLock) TryLock() error {
err := kv.RunInNewTxn(l.s, false, func(txn kv.Transaction) error {
lockKey := fmt.Sprintf("%s-LOCK", l.name)
val, err := txn.Get([]byte(lockKey))
if err != nil && !kv.IsErrNotFound(err) {
return err
} else if kv.IsErrNotFound(err) {
// if no such lock key, try to lock
l.writeLock(txn)
return nil
}
// if there's a lock already, check if it's us
var meta LockMeta
json.Unmarshal(val, &meta)
// if another one has the lock, and not timeout yet
if meta.ID != l.LockMeta.ID &&
time.Since(meta.LastUpdated) < 4*leaseInterval {
return ErrAlreadyLocked
} else if meta.ID != l.LockMeta.ID {
l.writeLock(txn)
}
return nil
})
if err == nil {
go l.keepalive()
}
return err
}
func (l *DLock) Lock() error {
try := 0
for {
if err := l.TryLock(); err == nil {
return nil
} else if terror.ErrorEqual(err, ErrAlreadyLocked) || kv.IsRetryableError(err) {
// backoff, take a rest, at least 100ms
sleepInterval := 100 + rand.Intn(500)
time.Sleep(time.Duration(sleepInterval) * time.Millisecond)
try++
if try > 10 {
log.Warn("retry to fetch lock...", l.name)
}
} else {
log.Error(err)
return err
}
}
}
func (l *DLock) Unlock() {
l.unlockSignal <- struct{}{}
}
func (l *DLock) getLockKey() []byte {
lockKey := fmt.Sprintf("%s-LOCK", l.name)
return []byte(lockKey)
}
func (l *DLock) writeLock(txn kv.Transaction) {
l.LockMeta.LastUpdated = time.Now()
meta, _ := json.Marshal(l.LockMeta)
txn.Set(l.getLockKey(), meta)
}
func (l *DLock) getLockMeta(txn kv.Transaction) (*LockMeta, error) {
val, err := txn.Get(l.getLockKey())
if err != nil {
return nil, err
}
var meta LockMeta
json.Unmarshal(val, &meta)
return &meta, nil
}
func (l *DLock) keepalive() error {
ticker := time.NewTicker(leaseInterval)
for {
select {
case <-ticker.C:
{
// update lease
err := kv.RunInNewTxn(l.s, false, func(txn kv.Transaction) error {
meta, err := l.getLockMeta(txn)
if err != nil {
return err
}
if meta.ID == l.ID {
l.writeLock(txn)
} else {
log.Fatal(l.ID, meta, "someone else took the lock")
}
return nil
})
if err != nil {
return err
}
}
case <-l.unlockSignal:
{
// unlock, delete lock key
ticker.Stop()
err := kv.RunInNewTxn(l.s, false, func(txn kv.Transaction) error {
meta, err := l.getLockMeta(txn)
if err != nil {
return err
}
// we can only unlock our lock
if meta.ID == l.LockMeta.ID {
txn.Delete(l.getLockKey())
}
return nil
})
return err
}
}
}
}
func main() {
flag.Parse()
driver := tikv.Driver{}
store, err := driver.Open(fmt.Sprintf("tikv://%s?cluster=1", *pdAddr))
if err != nil {
log.Fatal(err)
}
go func() {
log.Info(http.ListenAndServe("localhost:6060", nil))
}()
for i := 0; i < 5; i++ {
go func() {
l := NewLock(store, "lock")
for {
if err := l.Lock(); err == nil {
log.Info(l.ID, "got the lock")
time.Sleep(15 * time.Second)
log.Info(l.ID, "is going to unlock!")
l.Unlock()
}
}
}()
}
var ch chan int
ch <- 1
}
@niamster
Copy link

Please note that this lock is not correct as it relies on local time (for LastUpdated). I think it's impossible to achieve correct distributed lock without leases and fence tokens?

@itfanr
Copy link

itfanr commented Jan 17, 2023

github/juicefs/pkg/meta/tkv_lock.go

Why tkv_lock does not need update lease

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment