Created
March 9, 2021 04:28
-
-
Save tiancaiamao/705e3ef7d4bcb3c62ced44478e69cd19 to your computer and use it in GitHub Desktop.
consistent_cache.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"time" | |
) | |
func main() { | |
} | |
// The essential of the code piece is this interface. | |
// It's a read / write lock protocol: | |
// LockForRead() lock the data, so the client is safe to cache the data. | |
// RenewLease() can extend the lease of the read lock, the cliet will do that periodically. | |
// | |
// If the caller want to write data, it should lock the data first, thus there is a | |
// LockForWrite() method. Even the data is successful locked, it should wait until the | |
// read lock lease is gone. This write lock protect the data from been read locked. | |
// WriteAndUnlock(), as the name says, would write the data together with removing the write | |
// lock. | |
// | |
// CleanOrphanLock() is used to handle exceptions, such as an operation hold the write lock and crash. | |
type StateRemote interface { | |
LockForRead(tid uint64, lease uint64) bool | |
RenewLease(tid uint64, now uint64, lease uint64) bool | |
LockForWrite(tid uint64, lease uint64) uint64 | |
WriteAndUnlock(tid uint64, now uint64, txn Transaction) bool | |
CleanOrphanLock(tid uint64, now uint64) | |
} | |
// Read first try to read from the cache, if the read lock lease is there. | |
// If the lease TTL is coming, it will try to renew the lease asynchronously. | |
// If the cache is not available, the operation read from the remote, and | |
// try to refill the cache asynchronously. | |
func Read(local Local, remote Remote, tid uint64) []byte { | |
startTS := uint64(time.Now().Unix()) | |
lease := local.GetLease(tid) | |
if leaseValid(lease, startTS) { | |
ret := local.ReadFromCache() | |
if leaseShouldRenew(lease, startTS) { | |
ok := remote.RenewLease(tid, startTS, lease + 3*time.Second) | |
if ok { | |
} else { | |
} | |
} | |
return | |
} | |
local.notifyUpdate(tid, startTS) | |
return remote.ReadFromDB() | |
} | |
// Write put the write lock on the data, then wait the read lock lease to end. | |
// After that write is safe to update the data, together with cleaning the lock. | |
func Write(local Local, remote Remote, tid uint64, value []byte) { | |
lease := remote.LockForWrite() | |
sleep(lease) | |
WriteAndUnlock(txn) | |
} | |
type Local interface { | |
GetLease(tid uint64) uint64 | |
ReadFromCache() []byte | |
notifyUpdate() | |
} | |
type notificationMSG struct { | |
tid uint64 | |
ts uint64 | |
} | |
func notifyUpdate(tid, startTS uint64) { | |
select { | |
case notifyUpdateCh <- notificationMSG{tid, now}: | |
default: | |
} | |
} | |
func updateCacheWorkerLoop() { | |
for msg := range notifyUpdateCh { | |
updateCache(msg.tid, msg.ts) | |
} | |
} | |
func updateCache(tid uint64, ts uint64) { | |
lease = ts + 3*time.Second | |
cacheMeta := LockForRead(tid, lease) | |
if cacheMeta == Nil { | |
CleanOrphanLock() | |
} | |
} | |
type Transaction interface { | |
} | |
type lockType int | |
const ( | |
lockTypeNone = iota | |
lockTypeRead | |
lockTypeWrite | |
) | |
type mapStoreItem struct { | |
lease uint64 | |
lock lockType | |
} | |
// mapStore implements the StateRemote interface. | |
var _ StateRemote = mapStore{} | |
type mapStore map[uint64]*mapStoreItem | |
func (ms mapStore) LockForRead(tid uint64, lease uint64) bool { | |
item := ms[tid] | |
if item.lock == lockTypeWrite { | |
return false | |
} | |
item.lock = lockTypeRead | |
if item.lease < lease { | |
item.lease = lease | |
} | |
return true | |
} | |
func (ms mapStore) RenewLease(tid uint64, now, lease uint64) bool { | |
item := ms[tid] | |
if item.lock == lockTypeRead && item.lease > now { | |
item.lease = lease | |
return true | |
} | |
return false | |
} | |
func (ms mapStore) LockForWrite(tid uint64, lease uint64) uint64 { | |
item := ms[tid] | |
var wait uint64 | |
if item.lock == lockTypeRead { | |
wait = item.lease | |
} | |
item.lock = lockTypeWrite | |
item.lease = lease | |
return wait | |
} | |
func (ms mapStore) WriteAndUnlock(tid uint64, now uint64, txn Transaction) bool { | |
item := ms[tid] | |
if item.lock == lockTypeWrite && item.lease > now { | |
item.lock = lockTypeNone | |
return true | |
} | |
return false | |
} | |
func (ms mapStore) CleanOrphanLock(tid uint64, now uint64) { | |
item := ms[tid] | |
if now > item.lease { | |
item.lock = lockTypeNone | |
} | |
} | |
type sqlExec interface { | |
Exec(string, ...interface{}) | |
} | |
// tableStore implements the RemoteStore interface. | |
// The data is stored in a table: | |
// create table meta ( | |
// tid int primary key, | |
// lock_type enum('none', 'read', 'write'), | |
// lease timestamp | |
// ); | |
type tableStore struct { | |
sqlExec | |
} | |
func (ts tableStore) LockForRead(tid uint64, lease uint64) { | |
// begin; | |
// update meta set tid = %?, lock = 'READ', lease = now() + XX where lock != 'WRITE'; | |
// select * from t; | |
// commit; | |
ts.Exec("update meta set tid = %?, lock = 'READ', lease = %? where lock != 'WRITE'", tid) | |
} | |
func (ts tableStore) RenewLease(tid uint64, lease uint64) { | |
ts.Exec("update meta set lease = %? where tid = %? and lock = 'READ'", lease, tid) | |
} | |
func (ts tableStore) LockForWrite(tid uint64) { | |
ts.Exec("update meta set lock = 'WRITE', lease = '' where tid = TID") | |
} | |
func (ts tableStore) WriteAndUnlock() { | |
// Write data and cleanup the write lock should be in a transaction | |
// begin | |
// update user data... | |
// update meta set lock = 'NONE' and lease = now() + 'xxx'; | |
// commit | |
} | |
func (ts tableStore) CleanOrphanLock() { | |
ts.Exec("update meta set lock = 'NONE' where tid = TID and lock = 'WRITE' and lease > TSO") | |
} | |
type Remote interface { | |
StateRemote | |
Read(tid uint64) | |
Write(tid uint64, value []byte) | |
} | |
func leaseValid(lease, startTS uint64) bool { | |
return lease > startTS | |
} | |
func leaseShouldRenew(lease, startTS uint64) bool { | |
return startTs + 3*time.Second > lease | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment