Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save tiancaiamao/705e3ef7d4bcb3c62ced44478e69cd19 to your computer and use it in GitHub Desktop.
Save tiancaiamao/705e3ef7d4bcb3c62ced44478e69cd19 to your computer and use it in GitHub Desktop.
consistent_cache.go
package main
import (
"time"
)
func main() {
}
// The essential of the code piece is this interface.
// It's a read / write lock protocol:
// LockForRead() lock the data, so the client is safe to cache the data.
// RenewLease() can extend the lease of the read lock, the cliet will do that periodically.
//
// If the caller want to write data, it should lock the data first, thus there is a
// LockForWrite() method. Even the data is successful locked, it should wait until the
// read lock lease is gone. This write lock protect the data from been read locked.
// WriteAndUnlock(), as the name says, would write the data together with removing the write
// lock.
//
// CleanOrphanLock() is used to handle exceptions, such as an operation hold the write lock and crash.
type StateRemote interface {
LockForRead(tid uint64, lease uint64) bool
RenewLease(tid uint64, now uint64, lease uint64) bool
LockForWrite(tid uint64, lease uint64) uint64
WriteAndUnlock(tid uint64, now uint64, txn Transaction) bool
CleanOrphanLock(tid uint64, now uint64)
}
// Read first try to read from the cache, if the read lock lease is there.
// If the lease TTL is coming, it will try to renew the lease asynchronously.
// If the cache is not available, the operation read from the remote, and
// try to refill the cache asynchronously.
func Read(local Local, remote Remote, tid uint64) []byte {
startTS := uint64(time.Now().Unix())
lease := local.GetLease(tid)
if leaseValid(lease, startTS) {
ret := local.ReadFromCache()
if leaseShouldRenew(lease, startTS) {
ok := remote.RenewLease(tid, startTS, lease + 3*time.Second)
if ok {
} else {
}
}
return
}
local.notifyUpdate(tid, startTS)
return remote.ReadFromDB()
}
// Write put the write lock on the data, then wait the read lock lease to end.
// After that write is safe to update the data, together with cleaning the lock.
func Write(local Local, remote Remote, tid uint64, value []byte) {
lease := remote.LockForWrite()
sleep(lease)
WriteAndUnlock(txn)
}
type Local interface {
GetLease(tid uint64) uint64
ReadFromCache() []byte
notifyUpdate()
}
type notificationMSG struct {
tid uint64
ts uint64
}
func notifyUpdate(tid, startTS uint64) {
select {
case notifyUpdateCh <- notificationMSG{tid, now}:
default:
}
}
func updateCacheWorkerLoop() {
for msg := range notifyUpdateCh {
updateCache(msg.tid, msg.ts)
}
}
func updateCache(tid uint64, ts uint64) {
lease = ts + 3*time.Second
cacheMeta := LockForRead(tid, lease)
if cacheMeta == Nil {
CleanOrphanLock()
}
}
type Transaction interface {
}
type lockType int
const (
lockTypeNone = iota
lockTypeRead
lockTypeWrite
)
type mapStoreItem struct {
lease uint64
lock lockType
}
// mapStore implements the StateRemote interface.
var _ StateRemote = mapStore{}
type mapStore map[uint64]*mapStoreItem
func (ms mapStore) LockForRead(tid uint64, lease uint64) bool {
item := ms[tid]
if item.lock == lockTypeWrite {
return false
}
item.lock = lockTypeRead
if item.lease < lease {
item.lease = lease
}
return true
}
func (ms mapStore) RenewLease(tid uint64, now, lease uint64) bool {
item := ms[tid]
if item.lock == lockTypeRead && item.lease > now {
item.lease = lease
return true
}
return false
}
func (ms mapStore) LockForWrite(tid uint64, lease uint64) uint64 {
item := ms[tid]
var wait uint64
if item.lock == lockTypeRead {
wait = item.lease
}
item.lock = lockTypeWrite
item.lease = lease
return wait
}
func (ms mapStore) WriteAndUnlock(tid uint64, now uint64, txn Transaction) bool {
item := ms[tid]
if item.lock == lockTypeWrite && item.lease > now {
item.lock = lockTypeNone
return true
}
return false
}
func (ms mapStore) CleanOrphanLock(tid uint64, now uint64) {
item := ms[tid]
if now > item.lease {
item.lock = lockTypeNone
}
}
type sqlExec interface {
Exec(string, ...interface{})
}
// tableStore implements the RemoteStore interface.
// The data is stored in a table:
// create table meta (
// tid int primary key,
// lock_type enum('none', 'read', 'write'),
// lease timestamp
// );
type tableStore struct {
sqlExec
}
func (ts tableStore) LockForRead(tid uint64, lease uint64) {
// begin;
// update meta set tid = %?, lock = 'READ', lease = now() + XX where lock != 'WRITE';
// select * from t;
// commit;
ts.Exec("update meta set tid = %?, lock = 'READ', lease = %? where lock != 'WRITE'", tid)
}
func (ts tableStore) RenewLease(tid uint64, lease uint64) {
ts.Exec("update meta set lease = %? where tid = %? and lock = 'READ'", lease, tid)
}
func (ts tableStore) LockForWrite(tid uint64) {
ts.Exec("update meta set lock = 'WRITE', lease = '' where tid = TID")
}
func (ts tableStore) WriteAndUnlock() {
// Write data and cleanup the write lock should be in a transaction
// begin
// update user data...
// update meta set lock = 'NONE' and lease = now() + 'xxx';
// commit
}
func (ts tableStore) CleanOrphanLock() {
ts.Exec("update meta set lock = 'NONE' where tid = TID and lock = 'WRITE' and lease > TSO")
}
type Remote interface {
StateRemote
Read(tid uint64)
Write(tid uint64, value []byte)
}
func leaseValid(lease, startTS uint64) bool {
return lease > startTS
}
func leaseShouldRenew(lease, startTS uint64) bool {
return startTs + 3*time.Second > lease
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment