TiCDC Dump 设计文档
- 团队名称:Jiekun
- 作者:Jiekun
- 项目进展:Demo Ready
- 代码:
- 项目仓库:https://github.com/jiekun/tiflow/
- 关键算法实现:sink.go#L241
- Slides: Google Slides
Version | Updated At | Note |
---|---|---|
v1.0 | 2022-10-08 | For preliminary review. |
项目介绍
为 TiCDC 项目增加无锁、无额外写 I/O 、可随时 Pause/Resume 的存量数据复制能力。
背景 & 动机
异构(Heterogeneous)存储在日常开发中非常常见,如用户在以 TiDB 作为 Primary Storage 时,也可能期望使用 Elasticsearch 作为搜索引擎、ClickHouse 作为 OLAP 存储。落地些组合方案的重要步骤之一是实时地将 Primary Storage 数据同步至 Secondary Storage。但 Secondary Storage 常常在 Primary Storage 使用了一段时间、业务有所增长的时候才被引入,因此数据同步就需要:
- 复制存量数据;
- 同步增量数据。
对于增量数据同步,CDC 工具已经较为成熟和多样,例如 MySQL 有 canal、Debezium 等,TiDB 有 TiDB Binlog、TiCDC 等。
而对于存量数据复制,不同工具的做法各不相同:
- canal、TiCDC 不提供存量数据复制能力;
- Debezium 通过 Table Lock 的
SELECT
查询获取数据; - Maxwell 通过暂停处理 Transaction Log,
SELECT
得到数据,再恢复处理 Transaction Log。SELECT
结果可能比历史版本的 Transaction Log 更早投递至下游从而导致数据回拨,但终会由事务日志使之更新到最新版本。
其中,Maxwell 的实现也是很多业务团队的实现,在使用 CDC 组件的同时,由业务应用自行 SELECT FOR UPDATE
数据写入 Secondary Storage。
这些存量数据复制的方案:
- 存在锁操作,复制过程对业务应用不能做到无感知,对数据量大的 Table 加锁几乎不可能在大型项目中使用;
- 暂停 Transaction Log 处理会导致 CDC 延迟增加。
如果为 TiDB 实现存量数据复制功能,我们考虑:
- 以无锁的方式实现;
- 存量数据复制不影响或尽可能少影响增量数据同步,保持低延迟。
同时我们也留意到以往参加 Hackathon 孵化的项目,若游离在已有代码仓库之外,往往难以继续迭代维护。因此对于存量数据复制功能,我们认为应该整合到 TiCDC 项目中,且:
- 不影响原有功能,用户可以选择只同步增量数据,也可以选择复制存量数据 & 同步增量数据;
- 可以在增量数据同步的任意时间点选择同步存量数据,即存量数据复制可以随时 (Re)start、Pause 和 Resume。
项目设计
存量数据同步的方案参考自 Netflix 论文《DBLog: A Watermark Based Change-Data-Capture Framework》。在详细展示方案流程之前,我们先了解大致思路。
在 TiCDC 持续拉取 binlog event 的过程中,我们可以通过 SELECT
对目标数据表进行存量数据的获取,并且通过 LIMIT 限制每次查询的数量,通过这种方式获取到的内容称为一个数据块(chunk),而进行 SELECT
操作的时间段称为一个窗口(window)。我们知道 SELECT
过程是不加锁的,意味着这时间窗口内还会有其他的写操作产生(与 chunk 数据行有交集的)binlog events。如果我们能够知道这个时间窗口的边界,边界内的 binlog events 与 SELECT
得到的 chunk 可以进行合并,将 binlog events 中存在的数据行从 chunk 中剔除,取两者的交集作为时间窗口内的最终输出结果,下游就能得到正确的数据。
例如,在时间窗口内通过 SELECT
得到了 id=2 的数据行,同时在这个时间窗口内又得到了 id=2 的 binlog,这时候就存在 3 种可能:
- chunk 的数据是上一个版本的数据,时间窗口内有在
SELECT
之后的UPDATE
操作,产生的 binlog event 的是新数据; - chunk 的数据和 binlog event 数据是同一版本;
- chunk 数据是新数据,时间窗口内有在
SELECT
之前的UPDATE
操作,产生的 binlog event 数据是上一个版本的数据;
对所有的 3 种情况,通过剔除 chunk 中的数据,都可以保证下游收到的数据顺序正确。
在这个方案中,窗口的边界尤为重要,如果窗口边界不准确,最坏的情况是将老数据作为新数据 sink 到下游。为了标记窗口边界,我们通过 UPDATE
操作产生特殊 binlog。在 TiDB 集群中,新增一个单独的库表,如 changefeed_dump_db.changefeed_dump_event
。这个表只用来做 1 件事情:为特定的 changefeed 产生代表窗口边界的 binlog event。
具体来说,在 SELECT
之前,执行 UPDATE
操作修改 changefeed_dump_db.changefeed_dump_event
的数据,这个操作会产生 1 条 binlog event;同样在 SELECT 之后,再执行 UPDATE
操作产生另一条 binlog event。这两条 binlog events 就是时间窗口边界,处在它们之间的 binlog events 就是在窗口期内产生的 binlog events。
有了时间窗口边界之后,合并 chunk 和窗口内的 binlog events 就会相对容易,继而输出至下游。
流程说明
以下代码展示了如何例如 Watermark 进行交替的存量数据及增量数据获取,我们为每个步骤进行了标记,在代码块后进行解释说明:
func fullStateHandler() {
// (1) pause log event processing
// (2) generating watermark
lowWatermark, highWatermark := uuid(), uuid()
// (3) update watermark table to generate binlog event
updateWatermark(enum.LOW, lowWatermark)
// (4) select chunk from target table
chunkData := selectChunk(etcd.LastId, etcd.LastId+100)
// (5) update watermark table to generate binlog event
updateWatermark(enum.HIGH, highWatermark)
// todo: resume log event processing
}
func cdcHandler() {
var inWindow bool
var outputEntry []*model.KVEntry
// (6) resume log event processing
for {
e := nextKVEntry()
// not in lo-high watermark window
if !inWindow {
if !isWatermark(e) {
// should be a normal binlog event
outputEntry = append(outputEntry, e)
} else {
// (7) should be a low watermark, enter the window
inWindow = true
}
} else {
// (8) processing inside the window
if !isWatermar(e) {
// if same event presented in selected chunk, use binlog event.
// delete the same event in chunk data
if chunkData.Contain(e.id) {
chunkData.Delete(e.id)
}
outputEntry = append(outputEntry, e)
} else {
// (9) should be a high watermark. Append the rest chunk data to outputEntry
outputEntry = append(outputEntry, chunkData)
// leave the window
inWindow = false
}
}
// (10) output the outputEntry
}
}
- 为了获取存量数据,暂停 binlog event 的拉取和处理;
- 生成窗口标记;
- 通过
UPDATE
操作在 binlog event 流中产生一条特殊的 binlog event(称为 low watermark),标志着进入窗口(开始SELECT
); - 通过
SELECT
操作按照主键顺序采集到一块数据。SELECT
的进度将使用一致性存储来记录(通常为 etcd),方便随时暂停和恢复; - 通过
UPDATE
操作在 binlog event 流中产生一条特殊的 binlog event(称为 low watermark),标着着离开窗口(SELECT
结束); - 继续 binlog event 的拉取和处理;
- 遇到了 low watermark binlog event,开始处理窗口期数据;
- 窗口期一共产生了两种数据,我们需要取他们的交集作为窗口期的输出结果:
SELECT
得到的结果,称为 chunk data;- 正常拉取到的 binlog event。
- 遇到 high watermark binlog event,结束窗口期数据处理;
- 将交集数据作为窗口期的输出。
在第 (8) 步中,为了得到窗口期数据交集,每处理一条窗口期内的 binlog event,都应该尝试从 chunk data 中剔除相同数据,这样的输出结果在下游看来都是保持顺序的,不会产生数据回拨。
举例展示
我们再通过示意图具体说明对窗口期数据的处理。
在(1)暂停 binlog event 处理后,(2)生成 low watermark。(3) SELECT
得到数据 k1-k6,然后(4)生成 high watermark。
(5)继续处理 binlog event,在(6)遇到 low watermark event 时,准备空的 window buffer。处理窗口期内的 binlog event 时:
- 处理 k3:(7)将 k3 从 chunk data 中移除,将 k3 append 至 window buffer;
- 处理 k1:(7)将 k1 从 chunk data 中移除,将 k1 append 至 window buffer;
- 处理 k1:将 k1 append 至 window buffer;
- 处理 k1:将 k3 append 至 window buffer;
在(8)遇到 high watermark event 时,将 chunk data 中剩余的数据 append 至 window buffer。
最后(9)输出 window buffer 作为窗口期内的结果。
API 设计
依据当前 TiCDC 交互方式,用户可以通过 cdc cli
来操作 TiCDC。因此,在原有命令上新增以下命令。
cdc cli changefeed dumpstart --changefeed-id="your_changefeed_id"
cdc cli changefeed dumppause --changefeed-id="your_changefeed_id"
cdc cli changefeed dumpresume --changefeed-id="your_changefeed_id"
cdc cli changefeed dumprestart --changefeed-id="your_changefeed_id"
在创建 changefeed 的时候,用户已通过 filter 选定需要关注的 Schema 和 Table 范围,dump
命令将复用这些范围作为存量数据获取的目标,因此 dump
命令组的参数非常简洁,只需要传入 changefeed-id
。
Demo 展示
在 Demo 中我们:
- 有后台进程每隔 0.5 秒执行一次 UPDATE 操作;
- 执行
Start
命令后开始存量数据导出。Start
命令为cdc cli changefeed dumpstart
的 alias,便于展示; - 全量导出中,Chunk Size 设置为 10,并且加入了多处 Sleep 命令,以便让各个阶段更容易观察;
- 执行
Pause
命令后存量数据导出暂停; - 执行
Resume
命令后,从历史位置继续导出。
库表设计
在 TiDB 集群中,changefeed_dump.changefeed_dump_event 用于记录和生成 watermark。
create database changefeed_dump;
create table changefeed_dump_event (
id bigint not null auto_random comment "primary key",
changefeed_id varchar(128) not null comment "TiCDC changefeed_id",
unique_identifier varchar(32) not null comment "uuid - optimistic lock",
event_type tinyint not null comment "1-low watermark; 2-high watermark",
create_time int not null comment "create timestamp",
update_time int not null comment "update timestamp",
primary key (`id`),
unique key (`changefeed_id`)
);
Row Name | Row Type | Comment |
---|---|---|
id | int | primary key |
changefeed_id | string | TiCDC changefeed_id |
unique_identifier | string | uuid - optimistic lock |
event_type | tinyint | 1-low watermark; 2-high watermark |
create_time | int | timestamp |
update_time | int | timestamp |
方案总结
我们的方案通过交替进行增量数据与存量数据的同步,同时对存量数据的 chunk 大小控制,避免了大量 SELECT
导致的增量数据抓取停滞;通过引入 watermark 机制使得产生的数据保持一定顺序、可随时暂停和恢复。
任务安排
- RFC & 方案设计
- 搭建 TiDB & TiCDC
- 功能代码编写
- 增加根据配置
SELECT
TiDB 数据逻辑, 主键进度高可用存储(etcd/PD), Watermark 表实现 - TiCDC 流程变更, 支持 Interleave 执行增量和存量数据同步
- 增加根据配置
- 功能测试
- 性能测试
- Slides