Search CTRL + K

ClickHouse Kafka Engine

ClickHouse 提供 KafkaEngine 作为和 Kafka 交互的工具,和 物化视图 配合实现数据传输。

运行流程

  1. 批量拉取 Kafka 数据(时间限制:kafka_poll_timeout_ms;消息限制:kafka_poll_max_batch_size),每一批称为一个 poll
  2. 处理消息
  3. 判断是否满足一个 block(行数限制:kafka_max_block_size)或者时间超过限制(时间限制 kafka_flush_interval_ms),不满足继续拉取 kafka 数据(第 1 步)
  4. 将 block 传给物化视图

错误处理

设置 kafka_handle_error_mode='stream' 后,KafkaEngine 消费的每一行数据都会加上两个虚拟列:_error_raw_message

这样可以将数据 架构 错误无法处理的消息单独记录下来。

CREATE MATERIALIZED VIEW default.kafka_errors
(
    `topic` String,
    `partition` Int64,
    `offset` Int64,
    `raw` String,
    `error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
SELECT
    _topic AS topic,
    _partition AS partition,
    _offset AS offset,
    _raw_message AS raw,
    _error AS error
FROM default.kafka_engine
WHERE length(_error) > 0

运行调优

  1. 插入数据大小、频率调整 [1]

    通常不需要动:

    • kafka_poll_max_batch_size = max_block_size (65536)
    • kafka_poll_timeout_ms = stream_poll_timeout_ms (500ms)

    想要调整插入频率可以修改:

    • kafka_flush_interval_ms = stream_poll_timeout_ms (7500ms)
    • kafka_max_block_size = max_insert_block_size / kafka_num_consumers (默认单个消费者: 1048576)
  2. 插入并行度调整

    调整 kafka_num_consumers 增加消费者,可以优化消费 kafka 消息、处理消息的速度,但是 flush 数据的速度不会优化,因为还是一个线程处理。

    再设置 kafka_thread_per_consumer = 1 将使得每个消费者有自己的 buffer,并行消费、处理、flush,但同时会使内存使用扩大。

Altinity Knowledge Base

一些背景知识:[2]

  1. kafka_num_consumers 受限于物理 CPU 核数,设置 kafka_disable_num_consumers_limit 可以绕过限制。
  2. 可以用于 kafka_num_consumers 的线程池大小默认是 background_message_broker_schedule_pool_size = 16kafka_num_consumers 超过这个值没有意义。

  1. https://kb.altinity.com/altinity-kb-integrations/altinity-kb-kafka/altinity-kb-kafka-main-parsing-loop/ ↩︎

  2. https://kb.altinity.com/altinity-kb-integrations/altinity-kb-kafka/altinity-kb-kafka-parallel-consuming/ ↩︎