仓库地址:
基于TiDB适配现有项目KubeBrain,实现K8s所使用的元信息存储API。
当前K8s仅支持etcd作为元信息存储系统,因此部署Kubernetes集群总是需要额外的维护一套etcd集群,etcd集群并不支持水平扩容,随着数据量和读写请求量的增长,容易达到瓶颈。我们实现了轻量级K8s元信息存储项目KubeBrain对存储引擎API进行抽象,希望可以通过适配不同的分布式存储系统,从而可以充分利用已有的存储系统作为基础设施来部署K8s集群支持一定程度的元信息存储的水平扩容,打通K8s和其他存储系统之间的壁垒。

CREATE TABLE object(
internal_key VARCHAR(512) PRIMARY KEY,
object_value BLOB
)对于 API Server 读写请求参数中的 Raw Key,会进行编码出两类 Internal Key写入存储引擎索引和数据。对于每个APIServer写入的Raw Key,索引 Revision Key 记录只有一条,记录当前 Raw Key 的最新版本号, Revision Key 同时也是一把锁,每次对 Raw Key 的更新操作需要对索引进行 CAS。数据记录Object Key 有一到多条,每条数据记录了 Raw Key 的历史版本与版本对应的 Value。 Object Key 的编码方式为magic+raw_key+split_key+revision,其中:
magic为\x57\xfb\x80\x8b;raw_key为实际 API Server 输入到存储系统中的 Key ;split_key为$;revision为逻辑时钟对写操作分配的逻辑操作序号通过 BigEndian 编码成的 Bytes 。
特别的,Revision Key和Object Key编码方式相同,Revision固定为0。
将K8s到etcd读写请求转换为到TiDB的读写事务操作,实现存储引擎的API
// KvStorage defines the storage engine on kv database
type KvStorage interface {
// GetPartitions returns the partitions that keys are spread over in
// If it's not supported, just return [][]byte{start, end}, 0 , nil
GetPartitions(ctx context.Context, start, end []byte) (partitions []Partition, err error)
// Get returns value indexed by key
// If it's not exist, return ErrKeyNotFound
Get(ctx context.Context, key []byte) (val []byte, err error)
// Iter get keys from `start` to `end` (`end` will be smaller than `start` if SupportIterForward is true)
Iter(ctx context.Context, start []byte, end []byte, timestamp uint64, limit uint64) (Iter, error)
// BeginBatchWrite returns a new BeginBatchWrite instance
BeginBatchWrite() BatchWrite
Writer
FeatureSupport
// Close the kv storage
Close() error
}
// Writer defines some methods to modify kv in storage engine
// it aims to avoid multirow transaction if possible when write one key only
// it can be implemented by wrapping the BatchWrite
type Writer interface {
// Del removes kv from kv storage
Del(ctx context.Context, key []byte) (err error)
// DelCurrent removes kv ref by Iter
// Should be implemented as delete-if-value-equal or delete-if-version-equal
DelCurrent(ctx context.Context, iter Iter) (err error)
}
// BatchWrite should support atomic batch pack with several operations
type BatchWrite interface {
// PutIfNotExist creates kv if it doesn't exist.
// If it exists, return ErrCASFailed while committed
// *If it's available, return a Conflict instead of ErrCASFailed while committed to pass unexpected kv
PutIfNotExist(key []byte, val []byte, ttl int64)
// CAS compare and swap the value indexed by given key
// todo: deprecate it if refactor the format and procedure of write data
// If result of compare is false, return ErrCASFailed while committed
// * If it's available, return a Conflict instead of ErrCASFailed committed to pass unexpected kv
CAS(key []byte, newVal []byte, oldVal []byte, ttl int64)
// Put kv to storage
Put(key []byte, val []byte, ttl int64)
// Del remove kv from storage
Del(key []byte)
// DelCurrent is an ugly design to unify the cas deleting in different storage implement
DelCurrent(it Iter)
// Commit commits batch atomically, return the first error in batch
// * must return ErrUncertainResult if client can not know whether data is written
Commit(ctx context.Context) error
}
// Iter is the iterator on a **snapshot** of kv storage with batch buffer
// Example:
//
// iter := kvStorage.Iter(start, end, snapshotID, false)
// defer iter.Close()
// iterCtx := context.WithTimeout(ctx, timeout)
// for {
// err := iter.Next(iterCtx)
// if err != nil {
// if err == io.EOF {
// // come to the end
// }
// }
// key, value := iter.Key(), iter.Val()
// // processing ...
// }
//
type Iter interface {
// Key returns keys in buffer
Key() []byte
// Val returns values in buffer
Val() []byte
// Next get data from kv storage
// err should be io.EOF if there is no more keys
Next(ctx context.Context) (err error)
// Close the iter
Close() error
}写操作SQL
- Create类操作
BEGIN;
INSERT INTO object VALUES ($revision_key, $revision_val);
if `SELECT ROW_COUNT();` == 0
ROLLBACK;
INSERT INTO object VALUES ($object_key,$object_val);
COMMIT;- CAS类操作, 包括Update/Delete
BEGIN;
UPDATE object SET object_value=$revision_val WHERE internal_key=$revision_key AND object_val=$old_revision_val;
if `SELECT ROW_COUNT();` == 0
ROLLBACK;
INSERT INTO object VALUES ($object_key,$object_val);
COMMIT;读操作SQL
- 范围查询
SELECT * FROM object
WHERE object_key <= $upper_bound AND object_key>=$lower_bound
LIMIT $limit
ORDER BY object_key [ASC | DESC]当前项目的逻辑时钟依赖于存储引擎提供的逻辑时钟API,需要严格保证逻辑时钟的递增,由于TiDB的自增ID不一定能保证严格递增,因此需要实现基于异步预申请Buffer的逻辑时钟来支持这类存储引擎。
