Skip to content

Instantly share code, notes, and snippets.

@divanodestiny
Last active March 3, 2023 15:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save divanodestiny/fd4f957cdc3a099414e7a93459168958 to your computer and use it in GitHub Desktop.
Save divanodestiny/fd4f957cdc3a099414e7a93459168958 to your computer and use it in GitHub Desktop.
基于TiDB实现K8s元信息存储

基于TiDB适配KubeBrain实现K8s元信息存储

仓库地址:

项目介绍

基于TiDB适配现有项目KubeBrain,实现K8s所使用的元信息存储API。

背景&动机

当前K8s仅支持etcd作为元信息存储系统,因此部署Kubernetes集群总是需要额外的维护一套etcd集群,etcd集群并不支持水平扩容,随着数据量和读写请求量的增长,容易达到瓶颈。我们实现了轻量级K8s元信息存储项目KubeBrain对存储引擎API进行抽象,希望可以通过适配不同的分布式存储系统,从而可以充分利用已有的存储系统作为基础设施来部署K8s集群支持一定程度的元信息存储的水平扩容,打通K8s和其他存储系统之间的壁垒。 arch

详细设计

表结构

CREATE TABLE object(
    internal_key VARCHAR(512) PRIMARY KEY,
    object_value BLOB
)

对于 API Server 读写请求参数中的 Raw Key,会进行编码出两类 Internal Key写入存储引擎索引和数据。对于每个APIServer写入的Raw Key,索引 Revision Key 记录只有一条,记录当前 Raw Key 的最新版本号, Revision Key 同时也是一把锁,每次对 Raw Key 的更新操作需要对索引进行 CAS。数据记录Object Key 有一到多条,每条数据记录了 Raw Key 的历史版本与版本对应的 Value。 Object Key 的编码方式为magic+raw_key+split_key+revision,其中:

  • magic\x57\xfb\x80\x8b
  • raw_key为实际 API Server 输入到存储系统中的 Key ;
  • split_key$
  • revision为逻辑时钟对写操作分配的逻辑操作序号通过 BigEndian 编码成的 Bytes 。

encode

特别的,Revision Key和Object Key编码方式相同,Revision固定为0。

存储引擎适配

将K8s到etcd读写请求转换为到TiDB的读写事务操作,实现存储引擎的API

// KvStorage defines the storage engine on kv database
type KvStorage interface {
        // GetPartitions returns the partitions that keys are spread over in
        // If it's not supported, just return [][]byte{start, end}, 0 , nil
        GetPartitions(ctx context.Context, start, end []byte) (partitions []Partition, err error)

        // Get returns value indexed by key
        // If it's not exist, return ErrKeyNotFound
        Get(ctx context.Context, key []byte) (val []byte, err error)

        // Iter get keys from `start` to `end` (`end` will be smaller than `start` if SupportIterForward is true)
        Iter(ctx context.Context, start []byte, end []byte, timestamp uint64, limit uint64) (Iter, error)

        // BeginBatchWrite returns a new BeginBatchWrite instance
        BeginBatchWrite() BatchWrite

        Writer

        FeatureSupport

        // Close the kv storage
        Close() error
}

// Writer defines some methods to modify kv in storage engine
// it aims to avoid multirow transaction if possible when write one key only
// it can be implemented by wrapping the BatchWrite
type Writer interface {
        // Del removes kv from kv storage
        Del(ctx context.Context, key []byte) (err error)

        // DelCurrent removes kv ref by Iter
        // Should be implemented as delete-if-value-equal or delete-if-version-equal
        DelCurrent(ctx context.Context, iter Iter) (err error)
}

// BatchWrite should support atomic batch pack with several operations
type BatchWrite interface {

        // PutIfNotExist creates kv if it doesn't exist.
        // If it exists, return ErrCASFailed while committed
        // *If it's available, return a Conflict instead of ErrCASFailed while committed to pass unexpected kv
        PutIfNotExist(key []byte, val []byte, ttl int64)

        // CAS compare and swap the value indexed by given key
        // todo: deprecate it if refactor the format and procedure of write data
        // If result of compare is false, return ErrCASFailed while committed
        // * If it's available, return a Conflict instead of ErrCASFailed committed to pass unexpected kv
        CAS(key []byte, newVal []byte, oldVal []byte, ttl int64)

        // Put kv to storage
        Put(key []byte, val []byte, ttl int64)

        // Del remove kv from storage
        Del(key []byte)

        // DelCurrent is an ugly design to unify the cas deleting in different storage implement
        DelCurrent(it Iter)

        // Commit commits batch atomically, return the first error in batch
        // * must return ErrUncertainResult if client can not know whether data is written
        Commit(ctx context.Context) error
}

// Iter is the iterator on a **snapshot** of kv storage with batch buffer
// Example:
//
//        iter := kvStorage.Iter(start, end, snapshotID, false)
//  defer iter.Close()
//  iterCtx := context.WithTimeout(ctx, timeout)
//         for {
//      err := iter.Next(iterCtx)
//      if err != nil {
//          if err == io.EOF {
//             // come to the end
//          }
//      }
//      key, value := iter.Key(), iter.Val()
//      // processing ...
//   }
//
type Iter interface {
        // Key returns keys in buffer
        Key() []byte

        // Val returns values in buffer
        Val() []byte

        // Next get data from kv storage
        // err should be io.EOF if there is no more keys
        Next(ctx context.Context) (err error)

        // Close the iter
        Close() error
}

写操作SQL

  • Create类操作
BEGIN;
    INSERT INTO object VALUES ($revision_key, $revision_val);
    if `SELECT ROW_COUNT();` == 0
        ROLLBACK; 
    INSERT INTO object VALUES ($object_key,$object_val);
COMMIT;
  • CAS类操作, 包括Update/Delete
BEGIN;
    UPDATE object SET object_value=$revision_val WHERE internal_key=$revision_key AND object_val=$old_revision_val;
    if `SELECT ROW_COUNT();` == 0
        ROLLBACK; 
    INSERT INTO object VALUES ($object_key,$object_val);
COMMIT;

读操作SQL

  • 范围查询
SELECT * FROM object 
WHERE object_key <= $upper_bound AND object_key>=$lower_bound 
LIMIT $limit 
ORDER BY object_key [ASC | DESC]

实现本地的逻辑时钟

当前项目的逻辑时钟依赖于存储引擎提供的逻辑时钟API,需要严格保证逻辑时钟的递增,由于TiDB的自增ID不一定能保证严格递增,因此需要实现基于异步预申请Buffer的逻辑时钟来支持这类存储引擎。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment