仓库地址:
基于TiDB适配现有项目KubeBrain,实现K8s所使用的元信息存储API。
当前K8s仅支持etcd作为元信息存储系统,因此部署Kubernetes集群总是需要额外的维护一套etcd集群,etcd集群并不支持水平扩容,随着数据量和读写请求量的增长,容易达到瓶颈。我们实现了轻量级K8s元信息存储项目KubeBrain对存储引擎API进行抽象,希望可以通过适配不同的分布式存储系统,从而可以充分利用已有的存储系统作为基础设施来部署K8s集群支持一定程度的元信息存储的水平扩容,打通K8s和其他存储系统之间的壁垒。
CREATE TABLE object(
internal_key VARCHAR(512) PRIMARY KEY,
object_value BLOB
)
对于 API Server 读写请求参数中的 Raw Key,会进行编码出两类 Internal Key写入存储引擎索引和数据。对于每个APIServer写入的Raw Key,索引 Revision Key 记录只有一条,记录当前 Raw Key 的最新版本号, Revision Key 同时也是一把锁,每次对 Raw Key 的更新操作需要对索引进行 CAS。数据记录Object Key 有一到多条,每条数据记录了 Raw Key 的历史版本与版本对应的 Value。 Object Key 的编码方式为magic+raw_key+split_key+revision
,其中:
magic
为\x57\xfb\x80\x8b
;raw_key
为实际 API Server 输入到存储系统中的 Key ;split_key
为$
;revision
为逻辑时钟对写操作分配的逻辑操作序号通过 BigEndian 编码成的 Bytes 。
特别的,Revision Key和Object Key编码方式相同,Revision固定为0。
将K8s到etcd读写请求转换为到TiDB的读写事务操作,实现存储引擎的API
// KvStorage defines the storage engine on kv database
type KvStorage interface {
// GetPartitions returns the partitions that keys are spread over in
// If it's not supported, just return [][]byte{start, end}, 0 , nil
GetPartitions(ctx context.Context, start, end []byte) (partitions []Partition, err error)
// Get returns value indexed by key
// If it's not exist, return ErrKeyNotFound
Get(ctx context.Context, key []byte) (val []byte, err error)
// Iter get keys from `start` to `end` (`end` will be smaller than `start` if SupportIterForward is true)
Iter(ctx context.Context, start []byte, end []byte, timestamp uint64, limit uint64) (Iter, error)
// BeginBatchWrite returns a new BeginBatchWrite instance
BeginBatchWrite() BatchWrite
Writer
FeatureSupport
// Close the kv storage
Close() error
}
// Writer defines some methods to modify kv in storage engine
// it aims to avoid multirow transaction if possible when write one key only
// it can be implemented by wrapping the BatchWrite
type Writer interface {
// Del removes kv from kv storage
Del(ctx context.Context, key []byte) (err error)
// DelCurrent removes kv ref by Iter
// Should be implemented as delete-if-value-equal or delete-if-version-equal
DelCurrent(ctx context.Context, iter Iter) (err error)
}
// BatchWrite should support atomic batch pack with several operations
type BatchWrite interface {
// PutIfNotExist creates kv if it doesn't exist.
// If it exists, return ErrCASFailed while committed
// *If it's available, return a Conflict instead of ErrCASFailed while committed to pass unexpected kv
PutIfNotExist(key []byte, val []byte, ttl int64)
// CAS compare and swap the value indexed by given key
// todo: deprecate it if refactor the format and procedure of write data
// If result of compare is false, return ErrCASFailed while committed
// * If it's available, return a Conflict instead of ErrCASFailed committed to pass unexpected kv
CAS(key []byte, newVal []byte, oldVal []byte, ttl int64)
// Put kv to storage
Put(key []byte, val []byte, ttl int64)
// Del remove kv from storage
Del(key []byte)
// DelCurrent is an ugly design to unify the cas deleting in different storage implement
DelCurrent(it Iter)
// Commit commits batch atomically, return the first error in batch
// * must return ErrUncertainResult if client can not know whether data is written
Commit(ctx context.Context) error
}
// Iter is the iterator on a **snapshot** of kv storage with batch buffer
// Example:
//
// iter := kvStorage.Iter(start, end, snapshotID, false)
// defer iter.Close()
// iterCtx := context.WithTimeout(ctx, timeout)
// for {
// err := iter.Next(iterCtx)
// if err != nil {
// if err == io.EOF {
// // come to the end
// }
// }
// key, value := iter.Key(), iter.Val()
// // processing ...
// }
//
type Iter interface {
// Key returns keys in buffer
Key() []byte
// Val returns values in buffer
Val() []byte
// Next get data from kv storage
// err should be io.EOF if there is no more keys
Next(ctx context.Context) (err error)
// Close the iter
Close() error
}
写操作SQL
- Create类操作
BEGIN;
INSERT INTO object VALUES ($revision_key, $revision_val);
if `SELECT ROW_COUNT();` == 0
ROLLBACK;
INSERT INTO object VALUES ($object_key,$object_val);
COMMIT;
- CAS类操作, 包括Update/Delete
BEGIN;
UPDATE object SET object_value=$revision_val WHERE internal_key=$revision_key AND object_val=$old_revision_val;
if `SELECT ROW_COUNT();` == 0
ROLLBACK;
INSERT INTO object VALUES ($object_key,$object_val);
COMMIT;
读操作SQL
- 范围查询
SELECT * FROM object
WHERE object_key <= $upper_bound AND object_key>=$lower_bound
LIMIT $limit
ORDER BY object_key [ASC | DESC]
当前项目的逻辑时钟依赖于存储引擎提供的逻辑时钟API,需要严格保证逻辑时钟的递增,由于TiDB的自增ID不一定能保证严格递增,因此需要实现基于异步预申请Buffer的逻辑时钟来支持这类存储引擎。