TL;DR
- Scenario: Real-time/time-series OLAP, hundred-million-level details, low-latency dashboards and multi-dimensional analysis
- Conclusion: Time Chunk→Segment column storage + Roll-up + Bitmap index + mmap + multi-level cache
- Output: Storage/query mechanism key points, checklist, common pitfalls fix ideas
Data Storage
Druid Data Storage Architecture
Druid’s data storage uses hierarchical logical structure, mainly containing several layers:
1. DataSource
- Concept analogy: DataSource similar to Table in RDBMS
- Function positioning: As top-level container for data, one DataSource contains all related data for specific business domain
- Example: E-commerce website might create “user_behavior”, “product_inventory” DataSources to store different business data
2. Chunk
- Time partitioning: Each DataSource’s data is divided by time range to form Chunks
- Partition granularity: Configurable time granularity based on business needs
- Common configs: Day (1d), Hour (1h), Week (1w), etc.
- Example: When partitioning by day, 2023-01-01 is an independent Chunk
- Query advantage: This time-partitioned structure makes time range queries very efficient
3. Segment
- Physical storage: Segment is actual physical storage unit for data, each Segment is an independent file
- Data scale: One Segment typically contains several million rows (~5 million rows)
- File features: Segment files use columnar storage format, have compression and indexing features
- Parallel processing: Druid can load and process multiple Segments in parallel
Data Distribution Mechanism
- Time order: Segments are strictly organized in time order within Chunk
- Distributed storage: Segments are stored distributedly across multiple Druid cluster nodes
- Replica mechanism: For high availability, each Segment has multiple replicas (typically 2-3) stored on different nodes
Query Optimization
- Time filtering: System first determines time range involved in query (Chunk)
- Segment filtering: Then only loads Segment files in relevant Chunk
- Performance advantage: This mechanism significantly reduces data volume to scan, especially suitable for time-series data analysis scenarios
Practical Application Examples
- Monitoring system: Generates one Segment per minute, forms one Chunk per hour
- IoT data processing: Organizes Segments by device ID + time dual dimensions
- Advertising analysis: Creates one Chunk per day, further subdivides Segments by advertiser ID
Data Partitioning
Druid processes event data, each piece of data carries a timestamp, can be partitioned by time. Above specifies partition granularity as day, so data for each day is stored and queried separately.
Segment Internal Storage
- Druid uses columnar storage, each column’s data is stored in independent structure
- Segment data types mainly divided into three:
- Type1 Timestamp: Each row must have a TimeStamp, Druid always shards based on event timestamp
- Type2 Dimension column: Used for filter or GROUP BY, usually String, Float, Double, Int types
- Type3 Metric column: Used for aggregation calculation, specified aggregation functions sum, average, etc.
After MiddleManager node receives Ingestion task, starts creating Segment:
- Convert to columnar storage format
- Use bitmap to build indexes (build index for all dimension columns)
- Use various compression algorithms
- Algorithm1: All use LZ4 compression
- Algorithm2: All strings use dictionary encoding, to minimize storage
- Algorithm3: Use bitmap compression for bitmap indexes
After Segment creation complete, Segment file is immutable, written to deep storage (to prevent Segment loss after MiddleManager node crashes). Then Segment loaded to Historical node, Historical node can load directly into memory.
Also, Metadata store records this newly created Segment’s information: structure, size, deep storage location, etc.
Coordinator node needs this metadata to coordinate data lookup.
Indexing Service
Indexing service is service for data ingestion and creating Segment data files.
Indexing service is a highly available distributed service, using master-slave structure as architecture pattern. Indexing service consists of three components:
- Overlord as master node
- MiddleManager as slave node
- Peon for running a Task
Service Components
Overlord Component
Responsible for creating Tasks, distributing Tasks to MiddleManager to run, creating locks for Tasks and tracking Task running status, providing feedback to users.
MiddleManager Component
As slave node, responsible for receiving tasks assigned by master node, then starting an independent JVM process for each Task to complete specific tasks.
Peon (Worker) Component
A process started by MiddleManager for running one Task.
Comparison with YARN
- Overlord similar to ResourceManager, responsible for cluster resource management and task allocation
- MiddleManager similar to NodeManager, responsible for receiving tasks and managing node resources
- Peon similar to Container, executing specific tasks on nodes
Task Types
- index hadoop task: Hadoop indexing task, uses Hadoop cluster to execute MapReduce tasks to complete Segment data file creation, suitable for creating Segment data files with large volume
- index kafka task: For Kafka data real-time ingestion, can configure a KafkaSupervisor on Overlord, complete Kafka data ingestion by managing Kafka indexing task creation and lifecycle
- merge task: Merge indexing task, merges multiple Segment data files into one Segment data file with specified aggregation method
- kill task: Destroy indexing task, deletes data within execution time range from Druid cluster’s deep storage
Druid High-Performance Query Mechanism
Druid achieves low-latency, high-performance queries, mainly depending on five key technical points:
1. Data Pre-aggregation
Druid performs pre-aggregation during data ingestion stage, significantly reducing data volume to process at query time. System supports multiple aggregation methods:
- Count
- Sum
- Max
- Min
- Approximate cardinality (hyperloglog), etc.
For example, for website access log data, Druid can pre-compute metrics like PV, UV per minute during data ingestion, avoiding full calculation at query time.
2. Columnar Storage & Compression
Druid uses columnar storage architecture with multiple compression algorithms:
- String type: Dictionary encoding compression
- Numeric type: Bit compression, LZ4 compression, ZSTD compression
This storage method not only reduces I/O operations but also significantly improves compression ratio. For example, timestamp columns typically achieve over 10x compression ratio.
3. Bitmap Index
Druid builds Bitmap index for each dimension column:
- Generates corresponding bitmap for each dimension value
- Supports fast AND/OR/NOT bit operations
- Especially suitable for high-cardinality dimension filter queries
For example, querying “Chrome OR Firefox” on “browser type” dimension can quickly locate relevant data rows through bitmap OR operation.
4. Memory-mapped File (mmap)
Druid uses mmap technology to access disk data:
- Maps index and data files to memory address space
- OS automatically manages memory page loading and recycling
- Avoids traditional I/O system call overhead
- Supports automatic caching of hot data
This mechanism makes queries as fast as accessing memory, with OS intelligently managing cache.
5. Query Result Cache
Druid implements multi-level cache mechanism:
- Intermediate result cache: Stores partial query results
- Query result cache: Complete query result cache
- Supports time-based cache invalidation strategy
- For repeated requests with same query pattern, can return immediately
For example, common “last 1 hour data” queries on dashboard can return results directly within cache validity period without recalculating.
Data Pre-aggregation
- Druid performs RollUp processing, pre-aggregating raw data at injection time
- RollUp can compress data volume we need to save
- Druid aggregates data with same dimension combinations
- Druid can control injection data granularity through queryGranularity, smallest queryGranularity is millisecond
Roll-Up
Before aggregation: Raw data contains multiple detail records with same dimension combinations
After aggregation: Aggregated summary data by dimension combination
Bitmap Index
Druid ingestion data example:
- First column is time, Appkey and Area are dimension columns, Value is metric column
- Druid automatically performs RollUp during ingestion, aggregating data with same dimension combinations
- Data aggregation granularity determined by business needs
Data after daily aggregation: Druid achieves fast data lookup through bitmap index.
Bitmap index mainly accelerates scenarios with conditional filters during queries. When generating index files, Druid generates corresponding BitMap set for each value of each column:
Bitmap index can be seen as: HashMap<String, BitMap>
- Key: Dimension value
- Value: Whether corresponding rows in table have that dimension value
SQL Query Example
SELECT sum(value) FROM tab1
WHERE time='2020-01-01'
AND appkey in ('appkey1', 'appkey2')
AND area='北京'
Execution process analysis:
- Locate Segment based on time period
- appkey in (‘appkey1’, ‘appkey2’) and area=‘北京’ get respective bitmaps
- (appkey1 or appkey2) and 北京
- (110000 or 001100) and 101010 = 111100 and 101010 = 101000
- Qualified rows: first & third rows, sum(value) of these rows is 40
GroupBy Query Example
SELECT area, sum(value)
FROM tab1
WHERE time='2020-01-01'
AND appkey in ('appkey1', 'appkey2')
GROUP BY area
This query differs from above in that it takes qualified columns:
- appkey1 or appkey2
- 110000 or 001100 = 111100
- Take first to fourth rows
- Do group aggregation in memory, result: 北京 40, 深圳 60
Error Quick Reference
| Symptom | Root Cause | Location | Fix |
|---|---|---|---|
| Task succeeded but query empty | interval/timezone mismatch, segmentGranularity vs queryGranularity confusion | Check intervals in task logs; use SQL to check __time filter | Unify timezone; fix interval; rebuild Segment |
| Task failed to acquire lock | Multiple tasks writing same timeChunk lock conflict | Check locks and concurrency in Overlord UI/logs | Split interval/partitions; serialize writes or add partition dimension |
| Rejected segment / overshadowed | Version/rules override causing rejection | Compare Coordinator rules with segment versions | Adjust Load/Drop rules or reset version, republish |
| Historical not loading new segments | Deep storage path/credential wrong or rule missed | Historical logs “Failed to download segment”; Rules page | Fix storage config/permissions; add load rules and backfill |
| Direct buffer memory OOM | MaxDirectMemorySize too small or concurrency too high | Check hs_err / JVM logs keywords | Increase Direct Memory; reduce processing.buffer/threads |
| GroupBy memory insufficient/timeout | v2 spill config insufficient or data skewed | Broker/Historical logs and query context | Enable/increase disk spill; raise limits or switch to Timeseries/TopN |
| Filter hit poor/slow | Dimension not indexed / high-cardinality strategy improper | Check columns and indexes in Segment metadata | Build bitmap/use sketch for common dimensions; optimize schema |
| Cannot parse time | Time format/field mapping error | Verify timestampSpec sample data | Fix format/timezone; set secondarySpec |
| Kafka ingestion lag | Few partitions/low parallelism/transformation time consuming | Monitor Supervisor status and lag metrics | Increase task parallelism and partitions; push down filters/simplify transform |
| Can not vectorize filter | Non-vectorizable functions/expressions | Error point in Broker logs | Replace functions or allow non-vectorized execution; evaluate cost |