ClickHouse Kafka Engine
ClickHouse 提供 KafkaEngine 作为和 Kafka 交互的工具,和 物化视图 配合实现数据传输。
运行流程
- 批量拉取 Kafka 数据(时间限制:
kafka_poll_timeout_ms
;消息限制:kafka_poll_max_batch_size
),每一批称为一个 poll - 处理消息
- 判断是否满足一个 block(行数限制:
kafka_max_block_size
)或者时间超过限制(时间限制kafka_flush_interval_ms
),不满足继续拉取 kafka 数据(第 1 步) - 将 block 传给物化视图
错误处理
设置 kafka_handle_error_mode='stream'
后,KafkaEngine 消费的每一行数据都会加上两个虚拟列:_error
、_raw_message
。
这样可以将数据 架构 错误无法处理的消息单独记录下来。
CREATE MATERIALIZED VIEW default.kafka_errors
(
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM default.kafka_engine
WHERE length(_error) > 0
运行调优
-
插入数据大小、频率调整 [1]
通常不需要动:
kafka_poll_max_batch_size
=max_block_size
(65536)kafka_poll_timeout_ms
=stream_poll_timeout_ms
(500ms)
想要调整插入频率可以修改:
kafka_flush_interval_ms
=stream_poll_timeout_ms
(7500ms)kafka_max_block_size
=max_insert_block_size
/kafka_num_consumers
(默认单个消费者: 1048576)
-
插入并行度调整
调整
kafka_num_consumers
增加消费者,可以优化消费 kafka 消息、处理消息的速度,但是 flush 数据的速度不会优化,因为还是一个线程处理。再设置
kafka_thread_per_consumer = 1
将使得每个消费者有自己的 buffer,并行消费、处理、flush,但同时会使内存使用扩大。
Altinity Knowledge Base
一些背景知识:[2]
kafka_num_consumers
受限于物理 CPU 核数,设置kafka_disable_num_consumers_limit
可以绕过限制。- 可以用于
kafka_num_consumers
的线程池大小默认是background_message_broker_schedule_pool_size = 16
,kafka_num_consumers
超过这个值没有意义。