大数据-140 ClickHouse CollapsingMergeTree详解 外部数据源最小闭环HDFS/MySQL/Kafka
TL;DR
- 场景:高并发以增代删/更新审计流量,如何既省存储又查得准?
- 结论:更新场景用 VersionedCollapsing + argMax,删除/撤销用 Collapsing;FINAL 只用于小范围核对。
- 产出:标准 DDL、写入规范、argMax 查询模板、OPTIMIZE … FINAL ,HDFS、MySQL、Kafka数据源对接
CollapsingMergeTree
核心工作原理:以增代删
CollapsingMergeTree是ClickHouse中一种特殊的MergeTree引擎,其核心设计理念是”以增代删”。Yandex官方对此引擎的定义是:
CollapsingMergeTree会异步地删除(折叠)除了特定列的Sign列值为1和-1以外,其他所有字段值都相等的成对行。没有成对的行会被保留。这种设计可以显著降低存储量并提高SELECT查询效率。
状态列机制详解
CollapsingMergeTree引擎的关键在于其特殊的状态列Sign,该列的值决定行的状态:
- 1(状态行):表示这是一条有效的数据记录
- -1(取消行):表示这是一条需要被删除/撤销的记录
运行机制示例
假设我们有一个用户行为跟踪表:
CREATE TABLE user_actions (
user_id UInt64,
action_date Date,
action String,
duration UInt32,
sign Int8
) ENGINE = CollapsingMergeTree(sign)
ORDER BY (user_id, action_date, action)
写入示例:
- 先插入一条状态行:
INSERT INTO user_actions VALUES (123, '2023-01-01', 'login', 30, 1)
- 随后发现数据有误,插入取消行:
INSERT INTO user_actions VALUES (123, '2023-01-01', 'login', 30, -1)
后台合并时,这两条记录会被折叠删除。
应用场景
这种引擎特别适合以下场景:
- 需要频繁更新的时序数据
- 需要保留历史变更记录的审计场景
- 需要高效存储大量状态变更的数据
查询注意事项
由于折叠是异步进行的,查询时应使用:
SELECT * FROM user_actions FINAL
来获取已折叠的最终结果,或者使用条件过滤:
SELECT * FROM user_actions WHERE sign = 1
案例
创建新表
CREATE TABLE cmt_tab (
id UInt32,
sign Int8,
date Date,
name String,
point String
)
ENGINE = CollapsingMergeTree(sign)
PARTITION BY toYYYYMM(date)
ORDER BY (name, id)
SAMPLE BY id;
插入数据
INSERT INTO cmt_tab (id, sign, date, name, point) VALUES
(1, 1, '2024-01-01', 'Alice', '10'),
(2, 1, '2024-01-01', 'Bob', '15'),
(3, 1, '2024-01-02', 'Charlie', '20'),
(4, 1, '2024-01-02', 'David', '25'),
(5, 1, '2024-01-03', 'Eve', '30');
-- Mark Alice's row as deleted
-- Mark Bob's row as deleted
INSERT INTO cmt_tab (id, sign, date, name, point) VALUES
(1, -1, '2024-01-01', 'Alice', '10'),
(2, -1, '2024-01-01', 'Bob', '15');
-- Insert Alice's updated row
-- Insert Bob's updated row
INSERT INTO cmt_tab (id, sign, date, name, point) VALUES
(1, 1, '2024-01-01', 'Alice', '12'),
(2, 1, '2024-01-01', 'Bob', '18');
optimize
OPTIMIZE TABLE cmt_tab;
SELECT * FROM cmt_tab;
使用场景
大数据中对于数据更新很难做到,比如统计一个网站或TV的用户数,更多场景都是选择用记录每个点的数据,再对数据进行聚合查询。而ClickHouse通过CollapsingMergeTree就可以实现,使得CollapsingMergeTree大部分用于OLAP场景。
VersionedCollapsingMergeTree
这个引擎和CollapsingMergeTree差不多,只是对CollapsingMergeTree引擎加了一个版本,比如可以适用于非实时的在线统计,统计每个节点用户在线的业务。
其他数据源
端口冲突
ClickHouse和Hadoop的9000端口冲突了,可以更改ClickHouse的端口或Hadoop的端口。
连接方式示例:
clickhouse-client -m --host h121.wzk.icu --port 9001 --user default --password clickhouse@wzk.icu
HDFS
ENGINE = HDFS(URI, format)
该URI参数是HDFS中整个文件的URI,该format参数指定一种可用的文件格式。执行SELECT查询时,格式必须支持输入。
示例
添加新表:
CREATE TABLE hdfs_engine_table(
name String,
value UInt32
) ENGINE = HDFS('hdfs://h121.wzk.icu:9000/clickhouse', 'TSV');
插入数据:
INSERT INTO hdfs_engine_table VALUES('one', 1), ('two', 2), ('three', 3);
查询数据:
SELECT * FROM hdfs_engine_table;
实施细节
- 读取和写入可以并行
- 不支持:ALTER、SELECT SAMPLE、索引、复制
MySQL
MySQL 引擎可以对存储在远程MySQL服务器上的数据执行SELECT查询。
调用参数
- host:port:MySQL服务器地址
- database:数据库名称
- table:表名称
- user:数据库用户
- password:用户密码
- replace_query:将INSERT INTO查询是否替换为REPLACE_INFO的标志
- on_duplicate_clause:将ON DUPLICATE KEY UPDATE 表达式添加到INSERT查询语句中
示例
创建新表:
CREATE TABLE mysql_table2 (
`id` UInt32,
`name` String,
`age` UInt32
) ENGINE = MySQL('h122.wzk.icu:3306', 'clickhouse', 'mysql_table2', 'hive', 'hive@wzk.icu')
插入数据:
INSERT INTO mysql_table2 VALUES(1, 'wzk', 18);
INSERT INTO mysql_table2 VALUES(2, 'icu', 18);
查询数据:
SELECT * FROM mysql_table2;
Kafka
Apache Kafka 是一个分布式流处理平台,广泛用于构建实时数据管道和流应用。ClickHouse 提供了专门的 Kafka 引擎,使其能够直接从 Kafka 中读取数据,实现实时数据流的处理与分析。
创建新表
CREATE TABLE kafka_events
(
`timestamp` DateTime,
`event_type` String,
`user_id` UInt64,
`event_data` String
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'broker1:9092,broker2:9092',
kafka_topic_list = 'events_topic',
kafka_group_name = 'clickhouse_group',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1;
创建目标表并设置 Materialized View:
CREATE TABLE events (
`timestamp` DateTime,
`event_type` String,
`user_id` UInt64,
`event_data` String
) ENGINE = MergeTree()
ORDER BY timestamp;
CREATE MATERIALIZED VIEW kafka_to_events
TO events
AS SELECT * FROM kafka_events;
插入数据
INSERT INTO events SELECT * FROM kafka_events;
应用场景
- 实时日志分析:通过 Kafka 收集应用日志,ClickHouse 实时消费并分析日志数据
- 事件驱动的业务分析:实时跟踪用户行为事件,进行实时的用户行为分析和推荐系统
- 实时监控与报警:将监控数据流入 Kafka,ClickHouse 处理并生成实时报警指标
错误速查
| 症状 | 可能根因 | 核验修复/回滚 |
|---|---|---|
| FINAL 查询极慢 | part 太碎/仍多段 | system.parts先 OPTIMIZE … FINAL,再分表或 MV |
| 更新后读到老值 | 乱序或没 version | 检查写序强制 version 单调;紧急用 FINAL |
| WHERE sign=1 结果不对 | 忽略折叠 | 改 argMax/sum(sign)/FINAL |
引擎对比
| 需求 | 推荐 | 说明 |
|---|---|---|
| 删除/撤销 | VersionedCollapsing | 带版本更新,支持最新值 + 撤销操作 |
| 无版本抵消 | Collapsing | 成对抵消(插入与撤销记录配对),无版本控制 |
| 简单去重 | ReplacingMergeTree | 取 newest/anyLast(保留最新记录或任意一条) |
| 累加指标 | SummingMergeTree | 按 key 汇总(数值类指标自动求和) |