TL;DR

  • Scenario: Real-time + historical OLAP, need stable ingestion, scalable queries and predictable cluster operations
  • Conclusion: Split by Master (Coordinator/Overlord)/Query (Broker/Router)/Data (Historical/MiddleManager), with ZK, metadata store and Deep Storage, prioritize Segment and task scheduling governance
  • Output: Architecture key points list, version/dependency matrix, common fault location and fix quick reference card

Basic Architecture

Coordinator Node

Druid’s Coordinator component mainly manages Historical Node data load balancing and lifecycle management in the cluster. Core responsibilities:

  1. Data Load Balancing:

    • Monitors load on each Historical node
    • Migrates Segments between nodes based on configured balancing strategy
    • Ensures even data distribution, avoids node overload
    • Handles data redistribution after new node joins or failures
  2. Lifecycle Management:

    • Manages data according to pre-defined Rules
    • Coordinates new data loading process
    • Periodically checks and unloads expired data
    • Replicates important data for higher availability
    • Executes hot/cold tiering strategies

Coordinator runs periodically, execution interval controlled by druid.coordinator.period parameter, default 60000ms (60 seconds). Each run:

  1. Cluster State Sync:

    • Maintains persistent connection with ZooKeeper
    • Real-time获取 cluster topology and node status information
    • Monitors Historical node join, leave or failure
  2. Metadata Management:

    • Connects to configured metadata database (usually MySQL or PostgreSQL)
    • Reads and maintains all Segment metadata
    • Gets and processes data management Rule configurations
    • Records Segment distribution and status changes
  3. Decision Execution:

    • Generates data management plans based on rules and current state
    • Sends load/unload instructions to Historical nodes
    • Coordinates data migration between nodes
    • Handles data replication and balancing requests

Overlord Node

Monitors MiddleManager processes, and is the primary node for Druid data ingestion, responsible for assigning extraction tasks to MiddleManagers and coordinating Segment publishing, including accepting, splitting, assigning Tasks, creating Task-related locks, and returning Task status.

Historical Node

Loads generated data files for query. Historical Node is the core of entire cluster query performance, Historical handles the vast majority of Segment queries.

  • Historical process downloads Segments from Deep Storage, responds to query requests about these Segments (from Broker process)
  • Historical process doesn’t handle write requests
  • Historical process uses shared-nothing architecture, knows how to load and delete Segments, and how to respond to queries based on Segments. Even if underlying Deep Storage doesn’t work, Historical can still normally provide queries for its synced Segments.

MiddleManager Node

Ingest real-time data in time, generate Segment data files

  • MiddleManager process is the worker node that executes submitted tasks, MiddleManagers forward tasks to Peon processes running in different JVMs
  • MiddleManager, Peon, Task relationship: each Peon process can only run one Task at a time, but one MiddleManager can manage multiple Peon processes

Broker Node

Receives client query requests, forwards these queries to Historical and MiddleManagers. When Brokers receive results from these sub-queries, they merge these results and return to caller.

  • Broker nodes responsible for forwarding client query requests
  • Broker knows through ZooKeeper which Segments are on which nodes, forwards queries to corresponding nodes
  • After all nodes return data, Broker merges data from all nodes, then returns to Client

Router Node

(Optional) Responsible for routing to Broker, Coordinator, Overlords

  • Router process can provide unified API gateway layer over Broker, Overlords, Coordinator processes
  • Router process is optional. If cluster data scale reaches TB level, consider enabling (druid.router.managerProxy.enable=true)
  • Once cluster reaches certain scale, failure probability becomes significant, Router supports sending requests only to healthy nodes, avoiding request failures
  • At same time, query response time and resource consumption increase with data volume growth, Router supports setting query priority and load balancing strategy, avoiding large queries causing queue buildup or query hot spots

How to Classify

Druid processes can be arbitrarily deployed. For understanding and deployment organization convenience, these processes are divided into three categories:

  • Master: Coordinator, Overlord responsible for data availability and ingestion
  • Query: Broker, Router responsible for handling external requests
  • Data: Historical, MiddleManager, responsible for actual ingestion load and data storage

Other Dependencies

Deep Storage

Stores generated Segment data files, for Historical servers to download. For single-node cluster can be local disk, for distributed cluster generally HDFS.

  • Druid uses Deep Storage for data backup, also as a way for Druid processes to transfer data in background
  • When responding to queries, Historical first reads pre-fetched segments from local disk, also means need enough disk space in Deep Storage and loaded Historical

Metadata Storage

Stores Druid cluster metadata information, like Segment related info, generally uses MySQL.

ZooKeeper

Druid cluster ensures coordination of nodes through several key mechanisms:

  1. Coordinator Node Leader Election Mechanism

    • Election protocol: Uses ZooKeeper-based leader election algorithm
    • Responsibility scope: Monitors Historical node Segment loading status; coordinates Segment distribution in cluster; manages Segment load and drop strategies
    • Election process: When current Leader goes offline, all Coordinator nodes participate in new election, first to create temporary node on ZooKeeper becomes new Leader
  2. Historical Node Segment Publishing Protocol

    • Publishing process: After Historical node completes Segment loading, updates status in metadata storage; sends Segment available notification to Coordinator; Coordinator verifies metadata consistency, marks Segment as queryable
    • Fault tolerance: If publishing fails, Segment marked as failed state, Coordinator arranges retry
  3. Coordinator and Historical Segment Management Protocol

    • Load Segment flow: Coordinator selects target Historical node based on load balancing strategy; sends HTTP request to Historical to load specified Segment; Historical downloads Segment data from Deep Storage and loads into memory
    • Drop Segment flow: Coordinator sends delete instruction to Historical; Historical unloads Segment from memory and deletes local copy; updates metadata status
  4. Overlord Node Leader Election Mechanism

    • Election characteristics: Similar to Coordinator, also ZooKeeper-based
    • Responsibility scope: Receives and processes task submission requests; assigns tasks to MiddleManager; monitors task execution status
    • Failover: When Leader fails, new Leader takes over all running task states
  5. Overlord and MiddleManager Task Management Protocol

    • Task allocation flow: Overlord receives task submission request; selects execution node based on MiddleManager load; sends task description to MiddleManager via HTTP interface
    • Status sync mechanism: MiddleManager periodically sends heartbeat and task status updates to Overlord; Overlord monitors task progress, retries or reschedules if necessary
    • Fault handling: When MiddleManager disconnects, Overlord reassigns tasks to other available nodes

Architecture Evolution

Initial Version~0.6.0 (2012-2013)

0.7.0~0.12.0 (2013-2018)

0.13.0~Current Version

Lambda Architecture

From overall architecture perspective, Druid is a Lambda architecture. Lambda architecture was proposed by Storm creator Nathan Marz for real-time big data processing framework, designed to handle large-scale data while leveraging stream and batch processing advantages:

  • Batch Layer: Provides comprehensive, accurate data through batch processing on complete historical datasets
  • Speed Layer: Processes real-time incremental data, focus on low latency, speed layer data less complete/accurate than batch layer but fills data gaps from batch layer high latency
  • Serving Layer: Merges historical and real-time data together, outputs to database or other media for downstream analysis

Lambda architecture contains three layers: BatchLayer, SpeedLayer, Serving Layer

  • BatchLayer: Batch processing layer, pre-computes on offline historical data for downstream quick query results. Since batch processing based on complete historical datasets, accuracy guaranteed. Batch layer can use Hadoop, Spark, Flink frameworks.
  • SpeedLayer: Speed processing layer, processes real-time incremental data, focus on low latency. Speed layer data not as complete/accurate as batch layer, but fills data gap from batch layer high latency. Speed layer can use Storm, Spark Streaming, Flink frameworks.
  • ServingLayer: Merge layer, combines historical and real-time data, outputs to database or other media for downstream analysis

Streaming Data Pipeline

Raw Data → Kafka → Streaming Processor (Optional real-time ETL) → Kafka (Optional) → Druid → Application/User

Batch Data Pipeline

Raw data → Kafka (Optional) → HDFS → ETL Process (Optional) → Druid → Application/User


Error Quick Reference

SymptomRoot CauseLocationFix
Query slow/jitter/hot segmentsSegments too fragmented, missing compaction; Broker merge overhead high; low cache hitCheck Broker/Historical metrics (merge/CPU/cache hit), segment size and countEnable/optimize compaction, control segmentGranularity; tune Broker threads and query concurrency; enable/optimize cache strategy
Segment cannot load/“lost segments”Deep Storage read/write/permission issues; Coordinator rule triggers DropCoordinator action logs, Historical logs and Deep Storage ACLFix load/drop rules and ACL; retry loading; verify replicas and available space
Real-time ingestion backlogMiddleManager slot/memory insufficient; Kafka backlog growingOverlord task status, Peon logs, Kafka lagExpand MiddleManager/scale task slots and memory; reduce segment granularity; increase concurrency/rate limit
Leader frequent switchingZK flapping/session expirationZK logs, session expiration count, network jitter troubleshootingIncrease session timeout; stabilize ZK (3/5 node odd number); isolate noisy network
Query results missing/inconsistentSegment not published/not marked queryable; late-arriving data not mergedMetadata store status, Coordinator rules, data time windowCalibrate publishing flow; configure lateMessage handling and window; supplement run/recalculate and compact
Historical disk fullLocal cache unlimited; replica/retention policy improperDisk monitoring, cache directory and replica strategySet cache limit and cleanup strategy; shrink replica/retention rules; expand disk/nodes
Task frequently fails/retryPeon memory insufficient, dependency JAR/permission issuesPeon GC/OOM logs, container limits, dependency verificationIncrease Xmx/container limits; fix dependencies and permissions; split large tasks, shorten window
Broker 502/timeoutDownstream node slow/unreachable; aggregation timeoutBroker logs and downstream health checksSet dedicated pool/priority for slow queries; tiered rate limiting and timeout; exclude faulty nodes