ClickHouse MergeTree 实现只有一次语义的插入
ClickHouse 作为 OLAP 数据库,做了大量的插入、查询性能优化,比如 MergeTree 引擎基于 LSM 树结构,优化了查询性能(特别是范围查询)。
自然而然,事务由于过于“重”,并未添加到 ClickHouse 中,这就造成了可能的插入重复。例如,组件插入 ClickHouse 后因为网络断开未能收到 ClickHouse 的完成信号,组件自然会重试(实现至少一次插入语义)。
由于每次插入都会形成一个 part 落盘,若插入数据过小、插入次数过多会导致 ClickHouse 耗费大量资源在后台合并,这会严重影响 ClickHouse 性能。因此,在生产环境每次插入 ClickHouse 的数据可能有十万、百万行。这加剧了数据重复的影响。
若业务需要只有一次语义的插入,目前 ClickHouse 可以使用如下两种方式:
- Upsert[1]
- 数据回放 + 插入幂等
在 ClickHouse 中 Upsert 通过特殊的表引擎 ReplacingMergeTree
、CollapsingMergeTree
或 VersionedCollapsingMergeTree
实现,它们存在致命缺陷:
- 数据去重是通过后台合并进行的
- 严重拖慢合并速度
- 刚插入的重复数据在合并前会被查询到
- 插入时重复数据分散在不同 shard 将无法去重
Upsert 适用于非实时的场合,因此本文将主要介绍“数据回放 + 插入幂等”实现的方案。
消费重放(Message Replay)
消费重放是指消费过的数据可以被二次消费。想要实现插入重试,自然需要数据来源支持消费重放。
在本篇中我们使用 Kafka 作为数据源,Kafka 通过重置 offset 实现消费重放。
插入重试不能使用消费组
首先我们需要弄清楚什么时候可以使用 Kafka 的消费组,什么时候需要手动控制。
正常消费时,拉取数据、插入 ClickHouse 最后提交 offset,一切无误。但是发生 rebalance 时 partition 就会调度给不同 consumer。
触发 rebalance 的时机有三种:
- consumer 添加到消费组
- consumer 离开消费组
- partition 添加
rebalance、assignment 的类型说明这里不赘述,推荐 这篇文章 了解。
Kafka 无法保证 partition 和 consumer 的绑定,并且消费组也无法用于消费回放(一次回放的 partition 被分散给不同的 consumer)。
所以,消费回放一定要单独处理。
幂等插入
幂等性是指多次执行某个操作和执行一次相同,幂等插入就是指插入相同数据多次等同于插入一次。
ClickHouse 本身有非常弱的幂等插入支持:
Data blocks are deduplicated. For multiple writes of the same data block (data blocks of the same size containing the same rows in the same order), the block is only written once. The reason for this is in case of network failures when the client application does not know if the data was written to the DB, so the INSERT query can simply be repeated. It does not matter which replica INSERTs were sent to with identical data. INSERTs are idempotent. Deduplication parameters are controlled by merge_tree server settings.[2]
每次插入 ClickHouse 的数据称为一个 Block,会在插入时计算 hashsums 存储于 Zookeeper(默认保留 100 个、最长保留 1 周,通过 replicated_deduplication_window
、replicated_deduplication_window_seconds
控制)。当插入的 Block 大小、行数、行内容以及顺序一致(也就是说完全一致)时,ClickHouse 会丢掉这个 Block。
也就是说只要能保证插入重试时构建的 Block 完全一致,那么就可以避免插入重复。
WAL
为了实现插入失败后重试,需要每个节点在插入 ClickHouse 之前需要将消费日志写入外部存储,以便插入异常后的相同数据块重建。
同时为了保证数据顺序,将数据按照(topic,partition)的顺序排序(partition 内数据天然按消费顺序排序)。
方案
重点有两个:
- 每次循环都先尝试重建 Block,没有就新建 Block
- rebalance 时,因为 partition 将会发生调整,所以所有节点需要停止消费、清理数据再恢复