Skip to content

Instantly share code, notes, and snippets.

@c4pt0r
Last active June 7, 2022 01:31
Show Gist options
  • Save c4pt0r/ee9c4165a7a36b9a48a2dcf86d637bda to your computer and use it in GitHub Desktop.
Save c4pt0r/ee9c4165a7a36b9a48a2dcf86d637bda to your computer and use it in GitHub Desktop.
bigetc.go
package bigetc
import (
"database/sql"
"time"
"github.com/c4pt0r/log"
_ "github.com/go-sql-driver/mysql"
)
// BigEtc, a PoC implementation of Etcd's important APIs: Watch, Get, Set
// The core idea is:
// 1. TiDB is a scalable database with **SQL** semantics.
// 2. TiDB supports secondary indexes. very fast lookup.
// 3. TiDB supports transactions, with pessimistic concurrency control
// 4. TiDB's pessimistic concurrency control is based on MVCC and pessimistic lock is in-memory. (https://docs.pingcap.com/tidb/dev/pessimistic-transaction#in-memory-pessimistic-lock)
// 5. TiDB's lock is row-level, totally scalable.
// 6. TiDB uses multi-raft architecture to achieve strong consistency and auto-failover (https://tikv.org/blog/building-distributed-storage-system-on-raft/)
type BigEtc struct {
dsn string
db *sql.DB
watchers map[string]chan string
versions map[string]int64
}
func New(dsn string) *BigEtc {
return &BigEtc{
dsn: dsn,
watchers: make(map[string]chan string),
versions: make(map[string]int64),
}
}
func (b *BigEtc) Open() error {
var err error
b.db, err = sql.Open("mysql", b.dsn)
if err != nil {
return err
}
return b.createTables()
}
func (b *BigEtc) createTables() error {
_, err := b.db.Exec(`
CREATE TABLE IF NOT EXISTS _bigetc_store (
k VARCHAR(255) NOT NULL,
v VARCHAR(255) NOT NULL,
version BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (k)
)
`)
if err != nil {
return err
}
return nil
}
func (b *BigEtc) Close() error {
return b.db.Close()
}
func (b *BigEtc) Get(key string) (string, bool, error) {
var value string
err := b.db.QueryRow(`
SELECT
v
FROM
_bigetc_store
WHERE
k = ?
ORDER BY version DESC
LIMIT 1
`, key).Scan(&value)
if err != nil {
if err == sql.ErrNoRows {
return "", false, nil
}
return "", false, err
}
return value, true, nil
}
func (b *BigEtc) Set(key string, value string) error {
txn, err := b.db.Begin()
if err != nil {
return err
}
defer txn.Rollback()
_, err = txn.Exec(`
SELECT
k
FROM
_bigetc_store
WHERE k = ?
FOR UPDATE
`, key)
if err != nil {
return err
}
// if using INSERT here instead of UPSERT, we can keep change history feed
_, err = txn.Exec(`
INSERT INTO
_bigetc_store (k, v, version)
VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE
v = VALUES(v),
version = version + 1
`, key, value, 0)
if err != nil {
return err
}
return txn.Commit()
}
func (b *BigEtc) getMaxVersion(key string) (int64, error) {
var version int64
err := b.db.QueryRow(`
SELECT
MAX(version)
FROM
_bigetc_store
WHERE k = ?
`, key).Scan(&version)
if err != nil {
return 0, err
}
return version, nil
}
func (b *BigEtc) Watch(key string) <-chan string {
ch := make(chan string)
go func() {
for {
var err error
// get local version
version, ok := b.versions[key]
if !ok {
version, err = b.getMaxVersion(key)
if err != nil {
if err == sql.ErrNoRows {
b.Set(key, "")
} else {
log.Error(err)
}
}
b.versions[key] = version
}
// get remote version
remoteVersion, err := b.getMaxVersion(key)
if err != nil {
log.Error(err)
continue
}
// if remote version is greater than local version, get value
if remoteVersion > version {
value, _, err := b.Get(key)
if err != nil {
log.Error(err)
continue
}
ch <- value
b.versions[key] = remoteVersion
} else {
// if remote version is less than or equal to local version, sleep
time.Sleep(time.Millisecond * 100)
}
}
}()
return ch
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment