Created
March 19, 2017 11:26
-
-
Save c4pt0r/767557745dad2b3defbe7ad84d111986 to your computer and use it in GitHub Desktop.
simple distributed lock using TiKV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"errors" | |
"flag" | |
"fmt" | |
"math/rand" | |
"time" | |
"github.com/ngaut/log" | |
"github.com/pingcap/tidb/kv" | |
"github.com/pingcap/tidb/store/tikv" | |
"github.com/pingcap/tidb/terror" | |
"github.com/twinj/uuid" | |
"net/http" | |
_ "net/http/pprof" | |
) | |
const ( | |
leaseInterval time.Duration = 3 * time.Second | |
) | |
var pdAddr = flag.String("pd", "localhost:2379", "") | |
var ( | |
ErrAlreadyLocked error = errors.New("already locked") | |
) | |
type LockMeta struct { | |
ID string `json:"uuid"` | |
LastUpdated time.Time `json:"last_updated"` | |
} | |
type DLock struct { | |
name string | |
s kv.Storage | |
unlockSignal chan struct{} | |
LockMeta | |
} | |
func init() { | |
rand.Seed(time.Now().Unix()) | |
} | |
func NewLock(s kv.Storage, name string) *DLock { | |
return &DLock{ | |
name: name, | |
s: s, | |
unlockSignal: make(chan struct{}), | |
LockMeta: LockMeta{ | |
ID: uuid.NewV4().String(), | |
}, | |
} | |
} | |
func (l *DLock) TryLock() error { | |
err := kv.RunInNewTxn(l.s, false, func(txn kv.Transaction) error { | |
lockKey := fmt.Sprintf("%s-LOCK", l.name) | |
val, err := txn.Get([]byte(lockKey)) | |
if err != nil && !kv.IsErrNotFound(err) { | |
return err | |
} else if kv.IsErrNotFound(err) { | |
// if no such lock key, try to lock | |
l.writeLock(txn) | |
return nil | |
} | |
// if there's a lock already, check if it's us | |
var meta LockMeta | |
json.Unmarshal(val, &meta) | |
// if another one has the lock, and not timeout yet | |
if meta.ID != l.LockMeta.ID && | |
time.Since(meta.LastUpdated) < 4*leaseInterval { | |
return ErrAlreadyLocked | |
} else if meta.ID != l.LockMeta.ID { | |
l.writeLock(txn) | |
} | |
return nil | |
}) | |
if err == nil { | |
go l.keepalive() | |
} | |
return err | |
} | |
func (l *DLock) Lock() error { | |
try := 0 | |
for { | |
if err := l.TryLock(); err == nil { | |
return nil | |
} else if terror.ErrorEqual(err, ErrAlreadyLocked) || kv.IsRetryableError(err) { | |
// backoff, take a rest, at least 100ms | |
sleepInterval := 100 + rand.Intn(500) | |
time.Sleep(time.Duration(sleepInterval) * time.Millisecond) | |
try++ | |
if try > 10 { | |
log.Warn("retry to fetch lock...", l.name) | |
} | |
} else { | |
log.Error(err) | |
return err | |
} | |
} | |
} | |
func (l *DLock) Unlock() { | |
l.unlockSignal <- struct{}{} | |
} | |
func (l *DLock) getLockKey() []byte { | |
lockKey := fmt.Sprintf("%s-LOCK", l.name) | |
return []byte(lockKey) | |
} | |
func (l *DLock) writeLock(txn kv.Transaction) { | |
l.LockMeta.LastUpdated = time.Now() | |
meta, _ := json.Marshal(l.LockMeta) | |
txn.Set(l.getLockKey(), meta) | |
} | |
func (l *DLock) getLockMeta(txn kv.Transaction) (*LockMeta, error) { | |
val, err := txn.Get(l.getLockKey()) | |
if err != nil { | |
return nil, err | |
} | |
var meta LockMeta | |
json.Unmarshal(val, &meta) | |
return &meta, nil | |
} | |
func (l *DLock) keepalive() error { | |
ticker := time.NewTicker(leaseInterval) | |
for { | |
select { | |
case <-ticker.C: | |
{ | |
// update lease | |
err := kv.RunInNewTxn(l.s, false, func(txn kv.Transaction) error { | |
meta, err := l.getLockMeta(txn) | |
if err != nil { | |
return err | |
} | |
if meta.ID == l.ID { | |
l.writeLock(txn) | |
} else { | |
log.Fatal(l.ID, meta, "someone else took the lock") | |
} | |
return nil | |
}) | |
if err != nil { | |
return err | |
} | |
} | |
case <-l.unlockSignal: | |
{ | |
// unlock, delete lock key | |
ticker.Stop() | |
err := kv.RunInNewTxn(l.s, false, func(txn kv.Transaction) error { | |
meta, err := l.getLockMeta(txn) | |
if err != nil { | |
return err | |
} | |
// we can only unlock our lock | |
if meta.ID == l.LockMeta.ID { | |
txn.Delete(l.getLockKey()) | |
} | |
return nil | |
}) | |
return err | |
} | |
} | |
} | |
} | |
func main() { | |
flag.Parse() | |
driver := tikv.Driver{} | |
store, err := driver.Open(fmt.Sprintf("tikv://%s?cluster=1", *pdAddr)) | |
if err != nil { | |
log.Fatal(err) | |
} | |
go func() { | |
log.Info(http.ListenAndServe("localhost:6060", nil)) | |
}() | |
for i := 0; i < 5; i++ { | |
go func() { | |
l := NewLock(store, "lock") | |
for { | |
if err := l.Lock(); err == nil { | |
log.Info(l.ID, "got the lock") | |
time.Sleep(15 * time.Second) | |
log.Info(l.ID, "is going to unlock!") | |
l.Unlock() | |
} | |
} | |
}() | |
} | |
var ch chan int | |
ch <- 1 | |
} |
github/juicefs/pkg/meta/tkv_lock.go
Why tkv_lock does not need update lease ?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Please note that this lock is not correct as it relies on local time (for
LastUpdated
). I think it's impossible to achieve correct distributed lock without leases and fence tokens?