当前K8s仅支持etcd作为元信息存储系统,因此部署Kubernetes集群总是需要额外的维护一套etcd集群,etcd集群并不支持水平扩容,随着数据量和读写请求量的增长,容易达到瓶颈。我们实现了轻量级K8s元信息存储项目KubeBrain对存储引擎API进行抽象,希望可以通过适配不同的分布式存储系统,从而可以充分利用已有的存储系统作为基础设施来部署K8s集群支持一定程度的元信息存储的水平扩容,打通K8s和其他存储系统之间的壁垒。 arch



    internal_key VARCHAR(512) PRIMARY KEY,
    object_value BLOB

对于 API Server 读写请求参数中的 Raw Key,会进行编码出两类 Internal Key写入存储引擎索引和数据。对于每个APIServer写入的Raw Key,索引 Revision Key 记录只有一条,记录当前 Raw Key 的最新版本号, Revision Key 同时也是一把锁,每次对 Raw Key 的更新操作需要对索引进行 CAS。数据记录Object Key 有一到多条,每条数据记录了 Raw Key 的历史版本与版本对应的 Value。 Object Key 的编码方式为magic+raw_key+split_key+revision,其中:

  • magic\x57\xfb\x80\x8b
  • raw_key为实际 API Server 输入到存储系统中的 Key ;
  • split_key$
  • revision为逻辑时钟对写操作分配的逻辑操作序号通过 BigEndian 编码成的 Bytes 。


特别的,Revision Key和Object Key编码方式相同,Revision固定为0。



// KvStorage defines the storage engine on kv database
type KvStorage interface {
        // GetPartitions returns the partitions that keys are spread over in
        // If it's not supported, just return [][]byte{start, end}, 0 , nil
        GetPartitions(ctx context.Context, start, end []byte) (partitions []Partition, err error)

        // Get returns value indexed by key
        // If it's not exist, return ErrKeyNotFound
        Get(ctx context.Context, key []byte) (val []byte, err error)

        // Iter get keys from `start` to `end` (`end` will be smaller than `start` if SupportIterForward is true)
        Iter(ctx context.Context, start []byte, end []byte, timestamp uint64, limit uint64) (Iter, error)

        // BeginBatchWrite returns a new BeginBatchWrite instance
        BeginBatchWrite() BatchWrite



        // Close the kv storage
        Close() error

// Writer defines some methods to modify kv in storage engine
// it aims to avoid multirow transaction if possible when write one key only
// it can be implemented by wrapping the BatchWrite
type Writer interface {
        // Del removes kv from kv storage
        Del(ctx context.Context, key []byte) (err error)

        // DelCurrent removes kv ref by Iter
        // Should be implemented as delete-if-value-equal or delete-if-version-equal
        DelCurrent(ctx context.Context, iter Iter) (err error)

// BatchWrite should support atomic batch pack with several operations
type BatchWrite interface {

        // PutIfNotExist creates kv if it doesn't exist.
        // If it exists, return ErrCASFailed while committed
        // *If it's available, return a Conflict instead of ErrCASFailed while committed to pass unexpected kv
        PutIfNotExist(key []byte, val []byte, ttl int64)

        // CAS compare and swap the value indexed by given key
        // todo: deprecate it if refactor the format and procedure of write data
        // If result of compare is false, return ErrCASFailed while committed
        // * If it's available, return a Conflict instead of ErrCASFailed committed to pass unexpected kv
        CAS(key []byte, newVal []byte, oldVal []byte, ttl int64)

        // Put kv to storage
        Put(key []byte, val []byte, ttl int64)

        // Del remove kv from storage
        Del(key []byte)

        // DelCurrent is an ugly design to unify the cas deleting in different storage implement
        DelCurrent(it Iter)

        // Commit commits batch atomically, return the first error in batch
        // * must return ErrUncertainResult if client can not know whether data is written
        Commit(ctx context.Context) error

// Iter is the iterator on a **snapshot** of kv storage with batch buffer
// Example:
//        iter := kvStorage.Iter(start, end, snapshotID, false)
//  defer iter.Close()
//  iterCtx := context.WithTimeout(ctx, timeout)
//         for {
//      err := iter.Next(iterCtx)
//      if err != nil {
//          if err == io.EOF {
//             // come to the end
//          }
//      }
//      key, value := iter.Key(), iter.Val()
//      // processing ...
//   }
type Iter interface {
        // Key returns keys in buffer
        Key() []byte

        // Val returns values in buffer
        Val() []byte

        // Next get data from kv storage
        // err should be io.EOF if there is no more keys
        Next(ctx context.Context) (err error)

        // Close the iter
        Close() error


  • Create类操作
    INSERT INTO object VALUES ($revision_key, $revision_val);
    if `SELECT ROW_COUNT();` == 0
    INSERT INTO object VALUES ($object_key,$object_val);
  • CAS类操作, 包括Update/Delete
    UPDATE object SET object_value=$revision_val WHERE internal_key=$revision_key AND object_val=$old_revision_val;
    if `SELECT ROW_COUNT();` == 0
    INSERT INTO object VALUES ($object_key,$object_val);


  • 范围查询
SELECT * FROM object 
WHERE object_key <= $upper_bound AND object_key>=$lower_bound 
LIMIT $limit 
ORDER BY object_key [ASC | DESC]



