TL;DR

  • Scenario: Real-time/time-series OLAP, hundred-million-level details, low-latency dashboards and multi-dimensional analysis
  • Conclusion: Time Chunk→Segment column storage + Roll-up + Bitmap index + mmap + multi-level cache
  • Output: Storage/query mechanism key points, checklist, common pitfalls fix ideas

Data Storage

Druid Data Storage Architecture

Druid’s data storage uses hierarchical logical structure, mainly containing several layers:

1. DataSource

  • Concept analogy: DataSource similar to Table in RDBMS
  • Function positioning: As top-level container for data, one DataSource contains all related data for specific business domain
  • Example: E-commerce website might create “user_behavior”, “product_inventory” DataSources to store different business data

2. Chunk

  • Time partitioning: Each DataSource’s data is divided by time range to form Chunks
  • Partition granularity: Configurable time granularity based on business needs
    • Common configs: Day (1d), Hour (1h), Week (1w), etc.
    • Example: When partitioning by day, 2023-01-01 is an independent Chunk
  • Query advantage: This time-partitioned structure makes time range queries very efficient

3. Segment

  • Physical storage: Segment is actual physical storage unit for data, each Segment is an independent file
  • Data scale: One Segment typically contains several million rows (~5 million rows)
  • File features: Segment files use columnar storage format, have compression and indexing features
  • Parallel processing: Druid can load and process multiple Segments in parallel

Data Distribution Mechanism

  • Time order: Segments are strictly organized in time order within Chunk
  • Distributed storage: Segments are stored distributedly across multiple Druid cluster nodes
  • Replica mechanism: For high availability, each Segment has multiple replicas (typically 2-3) stored on different nodes

Query Optimization

  • Time filtering: System first determines time range involved in query (Chunk)
  • Segment filtering: Then only loads Segment files in relevant Chunk
  • Performance advantage: This mechanism significantly reduces data volume to scan, especially suitable for time-series data analysis scenarios

Practical Application Examples

  1. Monitoring system: Generates one Segment per minute, forms one Chunk per hour
  2. IoT data processing: Organizes Segments by device ID + time dual dimensions
  3. Advertising analysis: Creates one Chunk per day, further subdivides Segments by advertiser ID

Data Partitioning

Druid processes event data, each piece of data carries a timestamp, can be partitioned by time. Above specifies partition granularity as day, so data for each day is stored and queried separately.


Segment Internal Storage

  • Druid uses columnar storage, each column’s data is stored in independent structure
  • Segment data types mainly divided into three:
  • Type1 Timestamp: Each row must have a TimeStamp, Druid always shards based on event timestamp
  • Type2 Dimension column: Used for filter or GROUP BY, usually String, Float, Double, Int types
  • Type3 Metric column: Used for aggregation calculation, specified aggregation functions sum, average, etc.

After MiddleManager node receives Ingestion task, starts creating Segment:

  • Convert to columnar storage format
  • Use bitmap to build indexes (build index for all dimension columns)
  • Use various compression algorithms
  • Algorithm1: All use LZ4 compression
  • Algorithm2: All strings use dictionary encoding, to minimize storage
  • Algorithm3: Use bitmap compression for bitmap indexes

After Segment creation complete, Segment file is immutable, written to deep storage (to prevent Segment loss after MiddleManager node crashes). Then Segment loaded to Historical node, Historical node can load directly into memory.

Also, Metadata store records this newly created Segment’s information: structure, size, deep storage location, etc.

Coordinator node needs this metadata to coordinate data lookup.


Indexing Service

Indexing service is service for data ingestion and creating Segment data files.

Indexing service is a highly available distributed service, using master-slave structure as architecture pattern. Indexing service consists of three components:

  • Overlord as master node
  • MiddleManager as slave node
  • Peon for running a Task

Service Components

Overlord Component

Responsible for creating Tasks, distributing Tasks to MiddleManager to run, creating locks for Tasks and tracking Task running status, providing feedback to users.

MiddleManager Component

As slave node, responsible for receiving tasks assigned by master node, then starting an independent JVM process for each Task to complete specific tasks.

Peon (Worker) Component

A process started by MiddleManager for running one Task.

Comparison with YARN

  • Overlord similar to ResourceManager, responsible for cluster resource management and task allocation
  • MiddleManager similar to NodeManager, responsible for receiving tasks and managing node resources
  • Peon similar to Container, executing specific tasks on nodes

Task Types

  • index hadoop task: Hadoop indexing task, uses Hadoop cluster to execute MapReduce tasks to complete Segment data file creation, suitable for creating Segment data files with large volume
  • index kafka task: For Kafka data real-time ingestion, can configure a KafkaSupervisor on Overlord, complete Kafka data ingestion by managing Kafka indexing task creation and lifecycle
  • merge task: Merge indexing task, merges multiple Segment data files into one Segment data file with specified aggregation method
  • kill task: Destroy indexing task, deletes data within execution time range from Druid cluster’s deep storage

Druid High-Performance Query Mechanism

Druid achieves low-latency, high-performance queries, mainly depending on five key technical points:

1. Data Pre-aggregation

Druid performs pre-aggregation during data ingestion stage, significantly reducing data volume to process at query time. System supports multiple aggregation methods:

  • Count
  • Sum
  • Max
  • Min
  • Approximate cardinality (hyperloglog), etc.

For example, for website access log data, Druid can pre-compute metrics like PV, UV per minute during data ingestion, avoiding full calculation at query time.

2. Columnar Storage & Compression

Druid uses columnar storage architecture with multiple compression algorithms:

  • String type: Dictionary encoding compression
  • Numeric type: Bit compression, LZ4 compression, ZSTD compression

This storage method not only reduces I/O operations but also significantly improves compression ratio. For example, timestamp columns typically achieve over 10x compression ratio.

3. Bitmap Index

Druid builds Bitmap index for each dimension column:

  • Generates corresponding bitmap for each dimension value
  • Supports fast AND/OR/NOT bit operations
  • Especially suitable for high-cardinality dimension filter queries

For example, querying “Chrome OR Firefox” on “browser type” dimension can quickly locate relevant data rows through bitmap OR operation.

4. Memory-mapped File (mmap)

Druid uses mmap technology to access disk data:

  • Maps index and data files to memory address space
  • OS automatically manages memory page loading and recycling
  • Avoids traditional I/O system call overhead
  • Supports automatic caching of hot data

This mechanism makes queries as fast as accessing memory, with OS intelligently managing cache.

5. Query Result Cache

Druid implements multi-level cache mechanism:

  • Intermediate result cache: Stores partial query results
  • Query result cache: Complete query result cache
  • Supports time-based cache invalidation strategy
  • For repeated requests with same query pattern, can return immediately

For example, common “last 1 hour data” queries on dashboard can return results directly within cache validity period without recalculating.


Data Pre-aggregation

  • Druid performs RollUp processing, pre-aggregating raw data at injection time
  • RollUp can compress data volume we need to save
  • Druid aggregates data with same dimension combinations
  • Druid can control injection data granularity through queryGranularity, smallest queryGranularity is millisecond

Roll-Up

Before aggregation: Raw data contains multiple detail records with same dimension combinations

After aggregation: Aggregated summary data by dimension combination

Bitmap Index

Druid ingestion data example:

  • First column is time, Appkey and Area are dimension columns, Value is metric column
  • Druid automatically performs RollUp during ingestion, aggregating data with same dimension combinations
  • Data aggregation granularity determined by business needs

Data after daily aggregation: Druid achieves fast data lookup through bitmap index.

Bitmap index mainly accelerates scenarios with conditional filters during queries. When generating index files, Druid generates corresponding BitMap set for each value of each column:

Bitmap index can be seen as: HashMap<String, BitMap>

  • Key: Dimension value
  • Value: Whether corresponding rows in table have that dimension value

SQL Query Example

SELECT sum(value) FROM tab1
WHERE time='2020-01-01'
AND appkey in ('appkey1', 'appkey2')
AND area='北京'

Execution process analysis:

  1. Locate Segment based on time period
  2. appkey in (‘appkey1’, ‘appkey2’) and area=‘北京’ get respective bitmaps
  3. (appkey1 or appkey2) and 北京
  4. (110000 or 001100) and 101010 = 111100 and 101010 = 101000
  5. Qualified rows: first & third rows, sum(value) of these rows is 40

GroupBy Query Example

SELECT area, sum(value)
FROM tab1
WHERE time='2020-01-01'
AND appkey in ('appkey1', 'appkey2')
GROUP BY area

This query differs from above in that it takes qualified columns:

  • appkey1 or appkey2
  • 110000 or 001100 = 111100
  • Take first to fourth rows
  • Do group aggregation in memory, result: 北京 40, 深圳 60

Error Quick Reference

SymptomRoot CauseLocationFix
Task succeeded but query emptyinterval/timezone mismatch, segmentGranularity vs queryGranularity confusionCheck intervals in task logs; use SQL to check __time filterUnify timezone; fix interval; rebuild Segment
Task failed to acquire lockMultiple tasks writing same timeChunk lock conflictCheck locks and concurrency in Overlord UI/logsSplit interval/partitions; serialize writes or add partition dimension
Rejected segment / overshadowedVersion/rules override causing rejectionCompare Coordinator rules with segment versionsAdjust Load/Drop rules or reset version, republish
Historical not loading new segmentsDeep storage path/credential wrong or rule missedHistorical logs “Failed to download segment”; Rules pageFix storage config/permissions; add load rules and backfill
Direct buffer memory OOMMaxDirectMemorySize too small or concurrency too highCheck hs_err / JVM logs keywordsIncrease Direct Memory; reduce processing.buffer/threads
GroupBy memory insufficient/timeoutv2 spill config insufficient or data skewedBroker/Historical logs and query contextEnable/increase disk spill; raise limits or switch to Timeseries/TopN
Filter hit poor/slowDimension not indexed / high-cardinality strategy improperCheck columns and indexes in Segment metadataBuild bitmap/use sketch for common dimensions; optimize schema
Cannot parse timeTime format/field mapping errorVerify timestampSpec sample dataFix format/timezone; set secondarySpec
Kafka ingestion lagFew partitions/low parallelism/transformation time consumingMonitor Supervisor status and lag metricsIncrease task parallelism and partitions; push down filters/simplify transform
Can not vectorize filterNon-vectorizable functions/expressionsError point in Broker logsReplace functions or allow non-vectorized execution; evaluate cost