Big Data 140 - ClickHouse CollapsingMergeTree & External Data Sources

TL;DR

  • Scenario: High-concurrency audit traffic with upserts/updates, how to save storage while maintaining query accuracy?
  • Conclusion: Use VersionedCollapsing + argMax for updates, Collapsing for delete/undo; FINAL only for small-scale verification.
  • Output: Standard DDL, write specifications, argMax query template, OPTIMIZE … FINAL, HDFS/MySQL/Kafka data source integration

CollapsingMergeTree

Core Working Principle: Upsert Instead of Delete

CollapsingMergeTree is a special MergeTree engine in ClickHouse, with core design philosophy of “upsert instead of delete”. Yandex’s official definition:

CollapsingMergeTree asynchronously deletes (collapses) paired rows where all field values are equal except for the Sign column value of 1 and -1. Unpaired rows are retained. This design can significantly reduce storage and improve SELECT query efficiency.

Sign Column Mechanism Details

The key to CollapsingMergeTree engine is its special Sign column, which determines row state:

  • 1 (state row): Indicates a valid data record
  • -1 (cancel row): Indicates a record that needs to be deleted/undone

Operation Mechanism Example

Assume we have a user behavior tracking table:

CREATE TABLE user_actions (
    user_id UInt64,
    action_date Date,
    action String,
    duration UInt32,
    sign Int8
) ENGINE = CollapsingMergeTree(sign)
ORDER BY (user_id, action_date, action)

Write Example:

  1. First insert a state row:
INSERT INTO user_actions VALUES (123, '2023-01-01', 'login', 30, 1)
  1. Later found data error, insert cancel row:
INSERT INTO user_actions VALUES (123, '2023-01-01', 'login', 30, -1)

During background merge, these two records will be collapsed and deleted.

Application Scenarios

This engine is particularly suitable for:

  • Time-series data requiring frequent updates
  • Audit scenarios needing to preserve historical change records
  • Scenarios requiring efficient storage of large amounts of state change data

Query Notes

Since collapsing is asynchronous, use in queries:

SELECT * FROM user_actions FINAL

To get final collapsed results, or use conditional filtering:

SELECT * FROM user_actions WHERE sign = 1

Case Study

Create Table

CREATE TABLE cmt_tab (
  id UInt32,
  sign Int8,
  date Date,
  name String,
  point String
)
ENGINE = CollapsingMergeTree(sign)
PARTITION BY toYYYYMM(date)
ORDER BY (name, id)
SAMPLE BY id;

Insert Data

INSERT INTO cmt_tab (id, sign, date, name, point) VALUES
(1, 1, '2024-01-01', 'Alice', '10'),
(2, 1, '2024-01-01', 'Bob', '15'),
(3, 1, '2024-01-02', 'Charlie', '20'),
(4, 1, '2024-01-02', 'David', '25'),
(5, 1, '2024-01-03', 'Eve', '30');

-- Mark Alice's row as deleted
-- Mark Bob's row as deleted
INSERT INTO cmt_tab (id, sign, date, name, point) VALUES
(1, -1, '2024-01-01', 'Alice', '10'),
(2, -1, '2024-01-01', 'Bob', '15');

-- Insert Alice's updated row
-- Insert Bob's updated row
INSERT INTO cmt_tab (id, sign, date, name, point) VALUES
(1, 1, '2024-01-01', 'Alice', '12'),
(2, 1, '2024-01-01', 'Bob', '18');

Optimize

OPTIMIZE TABLE cmt_tab;
SELECT * FROM cmt_tab;

Usage Scenarios

In big data, it’s difficult to achieve true data updates. For example, when counting website or TV users, more scenarios choose to record each data point and then aggregate. ClickHouse can achieve this through CollapsingMergeTree, making it mostly used in OLAP scenarios.


VersionedCollapsingMergeTree

This engine is similar to CollapsingMergeTree, just adds a version to the CollapsingMergeTree engine. For example, it can be used for non-real-time online statistics, such as counting users online at each node.


External Data Sources

Port Conflict

ClickHouse and Hadoop both use port 9000. You can change ClickHouse’s port or Hadoop’s port.

Connection example:

clickhouse-client -m --host h121.wzk.icu --port 9001 --user default --password clickhouse@wzk.icu

HDFS

ENGINE = HDFS(URI, format)

The URI parameter is the URI of the entire file in HDFS. The format parameter specifies an available file format. When executing SELECT queries, the format must support input.

Example

Add new table:

CREATE TABLE hdfs_engine_table(
  name String,
  value UInt32
) ENGINE = HDFS('hdfs://h121.wzk.icu:9000/clickhouse', 'TSV');

Insert data:

INSERT INTO hdfs_engine_table VALUES('one', 1), ('two', 2), ('three', 3);

Query data:

SELECT * FROM hdfs_engine_table;

Implementation Details

  • Read and write can run in parallel
  • Does not support: ALTER, SELECT SAMPLE, indexes, replication

MySQL

MySQL engine can execute SELECT queries on data stored on remote MySQL servers.

Parameters

  • host:port: MySQL server address
  • database: Database name
  • table: Table name
  • user: Database user
  • password: User password
  • replace_query: Flag to determine whether to replace INSERT INTO with REPLACE_INFO
  • on_duplicate_clause: Adds ON DUPLICATE KEY UPDATE expression to INSERT query

Example

Create new table:

CREATE TABLE mysql_table2 (
  `id` UInt32,
  `name` String,
  `age` UInt32
) ENGINE = MySQL('h122.wzk.icu:3306', 'clickhouse', 'mysql_table2', 'hive', 'hive@wzk.icu')

Insert data:

INSERT INTO mysql_table2 VALUES(1, 'wzk', 18);
INSERT INTO mysql_table2 VALUES(2, 'icu', 18);

Query data:

SELECT * FROM mysql_table2;

Kafka

Apache Kafka is a distributed stream processing platform, widely used for building real-time data pipelines and streaming applications. ClickHouse provides a dedicated Kafka engine, enabling it to directly read from Kafka and process real-time data streams.

Create Table

CREATE TABLE kafka_events
(
    `timestamp` DateTime,
    `event_type` String,
    `user_id` UInt64,
    `event_data` String
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'broker1:9092,broker2:9092',
    kafka_topic_list = 'events_topic',
    kafka_group_name = 'clickhouse_group',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 1;

Create target table and set Materialized View:

CREATE TABLE events (
    `timestamp` DateTime,
    `event_type` String,
    `user_id` UInt64,
    `event_data` String
) ENGINE = MergeTree()
ORDER BY timestamp;

CREATE MATERIALIZED VIEW kafka_to_events
TO events
AS SELECT * FROM kafka_events;

Insert Data

INSERT INTO events SELECT * FROM kafka_events;

Application Scenarios

  • Real-time log analysis: Collect application logs through Kafka, ClickHouse consumes and analyzes in real-time
  • Event-driven business analysis: Track user behavior events in real-time for real-time analysis and recommendation systems
  • Real-time monitoring and alerting: Stream monitoring data into Kafka, ClickHouse processes and generates real-time alert metrics

Error Quick Reference

SymptomRoot CauseFix/Rollback
FINAL query extremely slowParts too fragmented/still many segmentssystem.parts first OPTIMIZE … FINAL, then split tables or use MV
Read old value after updateOut of order or no versionCheck write order, force version monotonicity; use FINAL in emergencies
WHERE sign=1 incorrect resultsCollapsing ignoredChange to argMax/sum(sign)/FINAL

Engine Comparison

RequirementRecommendedDescription
Delete/UndoVersionedCollapsingVersioned updates, supports latest value + undo operations
No-version cancelCollapsingPaired cancellation (insert paired with undo record), no version control
Simple deduplicationReplacingMergeTreeTakes newest/anyLast (keeps latest record or any one)
Cumulative metricsSummingMergeTreeSummation by key (numeric metrics auto-summed)