Skip to content

Instantly share code, notes, and snippets.

@jiekun
Last active July 11, 2023 14:28
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jiekun/ac4387b613e91c2d4142df35614cab34 to your computer and use it in GitHub Desktop.
Save jiekun/ac4387b613e91c2d4142df35614cab34 to your computer and use it in GitHub Desktop.
TiDB Hackathon 2022 RFC

TiCDC Dump 设计文档

Version Updated At Note
v1.0 2022-10-08 For preliminary review.

项目介绍

为 TiCDC 项目增加无锁、无额外写 I/O 、可随时 Pause/Resume 的存量数据复制能力。

背景 & 动机

异构(Heterogeneous)存储在日常开发中非常常见,如用户在以 TiDB 作为 Primary Storage 时,也可能期望使用 Elasticsearch 作为搜索引擎、ClickHouse 作为 OLAP 存储。落地些组合方案的重要步骤之一是实时地将 Primary Storage 数据同步至 Secondary Storage。但 Secondary Storage 常常在 Primary Storage 使用了一段时间、业务有所增长的时候才被引入,因此数据同步就需要:

  1. 复制存量数据
  2. 同步增量数据。

对于增量数据同步,CDC 工具已经较为成熟和多样,例如 MySQL 有 canal、Debezium 等,TiDB 有 TiDB Binlog、TiCDC 等。

而对于存量数据复制,不同工具的做法各不相同:

  • canal、TiCDC 不提供存量数据复制能力;
  • Debezium 通过 Table Lock 的 SELECT 查询获取数据;
  • Maxwell 通过暂停处理 Transaction Log,SELECT 得到数据,再恢复处理 Transaction Log。SELECT 结果可能比历史版本的 Transaction Log 更早投递至下游从而导致数据回拨,但终会由事务日志使之更新到最新版本。

其中,Maxwell 的实现也是很多业务团队的实现,在使用 CDC 组件的同时,由业务应用自行 SELECT FOR UPDATE 数据写入 Secondary Storage。

这些存量数据复制的方案:

  • 存在锁操作,复制过程对业务应用不能做到无感知,对数据量大的 Table 加锁几乎不可能在大型项目中使用;
  • 暂停 Transaction Log 处理会导致 CDC 延迟增加。

如果为 TiDB 实现存量数据复制功能,我们考虑:

  • 以无锁的方式实现
  • 存量数据复制不影响或尽可能少影响增量数据同步,保持低延迟

同时我们也留意到以往参加 Hackathon 孵化的项目,若游离在已有代码仓库之外,往往难以继续迭代维护。因此对于存量数据复制功能,我们认为应该整合到 TiCDC 项目中,且:

  • 不影响原有功能,用户可以选择只同步增量数据,也可以选择复制存量数据 & 同步增量数据;
  • 可以在增量数据同步的任意时间点选择同步存量数据,即存量数据复制可以随时 (Re)start、Pause 和 Resume

项目设计

存量数据同步的方案参考自 Netflix 论文《DBLog: A Watermark Based Change-Data-Capture Framework》。在详细展示方案流程之前,我们先了解大致思路。

在 TiCDC 持续拉取 binlog event 的过程中,我们可以通过 SELECT 对目标数据表进行存量数据的获取,并且通过 LIMIT 限制每次查询的数量,通过这种方式获取到的内容称为一个数据块chunk),而进行 SELECT 操作的时间段称为一个窗口window)。我们知道 SELECT 过程是不加锁的,意味着这时间窗口内还会有其他的写操作产生(与 chunk 数据行有交集的)binlog events。如果我们能够知道这个时间窗口的边界,边界内的 binlog events 与 SELECT 得到的 chunk 可以进行合并,将 binlog events 中存在的数据行从 chunk 中剔除,取两者的交集作为时间窗口内的最终输出结果,下游就能得到正确的数据。

例如,在时间窗口内通过 SELECT 得到了 id=2 的数据行,同时在这个时间窗口内又得到了 id=2 的 binlog,这时候就存在 3 种可能:

  1. chunk 的数据是上一个版本的数据,时间窗口内有在 SELECT 之后的 UPDATE 操作,产生的 binlog event 的是新数据;
  2. chunk 的数据和 binlog event 数据是同一版本;
  3. chunk 数据是新数据,时间窗口内有在 SELECT 之前的 UPDATE 操作,产生的 binlog event 数据是上一个版本的数据;

对所有的 3 种情况,通过剔除 chunk 中的数据,都可以保证下游收到的数据顺序正确。

在这个方案中,窗口的边界尤为重要,如果窗口边界不准确,最坏的情况是将老数据作为新数据 sink 到下游。为了标记窗口边界,我们通过 UPDATE 操作产生特殊 binlog。在 TiDB 集群中,新增一个单独的库表,如 changefeed_dump_db.changefeed_dump_event。这个表只用来做 1 件事情:为特定的 changefeed 产生代表窗口边界的 binlog event。

具体来说,在 SELECT 之前,执行 UPDATE 操作修改 changefeed_dump_db.changefeed_dump_event 的数据,这个操作会产生 1 条 binlog event;同样在 SELECT 之后,再执行 UPDATE 操作产生另一条 binlog event。这两条 binlog events 就是时间窗口边界,处在它们之间的 binlog events 就是在窗口期内产生的 binlog events。

有了时间窗口边界之后,合并 chunk 和窗口内的 binlog events 就会相对容易,继而输出至下游。

流程说明

以下代码展示了如何例如 Watermark 进行交替的存量数据及增量数据获取,我们为每个步骤进行了标记,在代码块后进行解释说明:

func fullStateHandler() {
	// (1) pause log event processing

	// (2) generating watermark
	lowWatermark, highWatermark := uuid(), uuid()

	// (3) update watermark table to generate binlog event
	updateWatermark(enum.LOW, lowWatermark)

	// (4) select chunk from target table
	chunkData := selectChunk(etcd.LastId, etcd.LastId+100)

	// (5) update watermark table to generate binlog event
	updateWatermark(enum.HIGH, highWatermark)

	// todo: resume log event processing
}

func cdcHandler() {
	var inWindow bool
	var outputEntry []*model.KVEntry

    // (6) resume log event processing
	for {
		e := nextKVEntry()

		// not in lo-high watermark window
		if !inWindow {
			if !isWatermark(e) {
				// should be a normal binlog event
				outputEntry = append(outputEntry, e)
			} else {
				// (7) should be a low watermark, enter the window
				inWindow = true
			}
		} else {
            // (8) processing inside the window
			if !isWatermar(e) {
				// if same event presented in selected chunk, use binlog event.
				// delete the same event in chunk data
				if chunkData.Contain(e.id) {
					chunkData.Delete(e.id)
				}
				outputEntry = append(outputEntry, e)
			} else {
				// (9) should be a high watermark. Append the rest chunk data to outputEntry
				outputEntry = append(outputEntry, chunkData)
				// leave the window
				inWindow = false
			}
		}
		
		// (10) output the outputEntry
	}
}
  1. 为了获取存量数据,暂停 binlog event 的拉取和处理;
  2. 生成窗口标记;
  3. 通过 UPDATE 操作在 binlog event 流中产生一条特殊的 binlog event(称为 low watermark),标志着进入窗口(开始 SELECT);
  4. 通过 SELECT 操作按照主键顺序采集到一块数据。SELECT 的进度将使用一致性存储来记录(通常为 etcd),方便随时暂停和恢复;
  5. 通过 UPDATE 操作在 binlog event 流中产生一条特殊的 binlog event(称为 low watermark),标着着离开窗口(SELECT 结束);
  6. 继续 binlog event 的拉取和处理;
  7. 遇到了 low watermark binlog event,开始处理窗口期数据;
  8. 窗口期一共产生了两种数据,我们需要取他们的交集作为窗口期的输出结果:
    1. SELECT 得到的结果,称为 chunk data;
    2. 正常拉取到的 binlog event。
  9. 遇到 high watermark binlog event,结束窗口期数据处理;
  10. 将交集数据作为窗口期的输出。

在第 (8) 步中,为了得到窗口期数据交集,每处理一条窗口期内的 binlog event,都应该尝试从 chunk data 中剔除相同数据,这样的输出结果在下游看来都是保持顺序的,不会产生数据回拨。

举例展示

我们再通过示意图具体说明对窗口期数据的处理。

在(1)暂停 binlog event 处理后,(2)生成 low watermark。(3) SELECT 得到数据 k1-k6,然后(4)生成 high watermark。

(5)继续处理 binlog event,在(6)遇到 low watermark event 时,准备空的 window buffer。处理窗口期内的 binlog event 时:

  1. 处理 k3:(7)将 k3 从 chunk data 中移除,将 k3 append 至 window buffer;
  2. 处理 k1:(7)将 k1 从 chunk data 中移除,将 k1 append 至 window buffer;
  3. 处理 k1:将 k1 append 至 window buffer;
  4. 处理 k1:将 k3 append 至 window buffer;

在(8)遇到 high watermark event 时,将 chunk data 中剩余的数据 append 至 window buffer。

最后(9)输出 window buffer 作为窗口期内的结果。

API 设计

依据当前 TiCDC 交互方式,用户可以通过 cdc cli 来操作 TiCDC。因此,在原有命令上新增以下命令。

  • cdc cli changefeed dumpstart --changefeed-id="your_changefeed_id"
  • cdc cli changefeed dumppause --changefeed-id="your_changefeed_id"
  • cdc cli changefeed dumpresume --changefeed-id="your_changefeed_id"
  • cdc cli changefeed dumprestart --changefeed-id="your_changefeed_id"

在创建 changefeed 的时候,用户已通过 filter 选定需要关注的 Schema 和 Table 范围,dump 命令将复用这些范围作为存量数据获取的目标,因此 dump 命令组的参数非常简洁,只需要传入 changefeed-id

Demo 展示

在 Demo 中我们:

  • 有后台进程每隔 0.5 秒执行一次 UPDATE 操作;
  • 执行 Start 命令后开始存量数据导出。Start 命令为 cdc cli changefeed dumpstart 的 alias,便于展示;
  • 全量导出中,Chunk Size 设置为 10,并且加入了多处 Sleep 命令,以便让各个阶段更容易观察;
  • 执行 Pause 命令后存量数据导出暂停;
  • 执行 Resume 命令后,从历史位置继续导出。

由于 GitHub 限制,GIF 无法完全加载,请点击跳转原图浏览:imgbox

库表设计

在 TiDB 集群中,changefeed_dump.changefeed_dump_event 用于记录和生成 watermark。

create database changefeed_dump;
create table changefeed_dump_event (
    id bigint not null auto_random comment "primary key",
    changefeed_id varchar(128) not null comment "TiCDC changefeed_id",
    unique_identifier varchar(32) not null comment "uuid - optimistic lock",
    event_type tinyint not null comment "1-low watermark; 2-high watermark",
    create_time int not null comment "create timestamp",
    update_time int not null comment "update timestamp",
    primary key (`id`),
    unique key (`changefeed_id`)
);
Row Name Row Type Comment
id int primary key
changefeed_id string TiCDC changefeed_id
unique_identifier string uuid - optimistic lock
event_type tinyint 1-low watermark; 2-high watermark
create_time int timestamp
update_time int timestamp

方案总结

我们的方案通过交替进行增量数据与存量数据的同步,同时对存量数据的 chunk 大小控制,避免了大量 SELECT 导致的增量数据抓取停滞;通过引入 watermark 机制使得产生的数据保持一定顺序、可随时暂停和恢复。

任务安排

  • RFC & 方案设计
  • 搭建 TiDB & TiCDC
  • 功能代码编写
    • 增加根据配置 SELECT TiDB 数据逻辑, 主键进度高可用存储(etcd/PD), Watermark 表实现
    • TiCDC 流程变更, 支持 Interleave 执行增量和存量数据同步
  • 功能测试
  • 性能测试
  • Slides
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment