TL;DR

  • Scenario: E-commerce clicks/orders flow into Kafka, Druid performs real-time analysis and visualization.
  • Conclusion: Nested JSON needs preprocessing to flatten; Kafka Indexing Service set “from earliest offset” and disable Rollup is more stable.
  • Output: Scala Producer→Druid ingestion→SQL metrics (orders/users/payment/TopN) runs through and common pitfalls list.

Overall Process

  • Kafka Data Source: Kafka is a distributed stream processing platform, responsible for receiving, storing and transmitting data. It supports collecting real-time data from various applications, logs, sensors, etc., divides data into multiple Topics, and distributes messages to consumers.
  • Kafka Producer: Data producer (Producer) responsible for sending data to Kafka topics.
  • Druid Kafka Ingestion: Druid provides native support for Kafka. Through Kafka Indexing Service, Druid can continuously consume data from a Kafka topic and ingest into Druid in real-time.
  • Real-time Data Ingestion and Indexing: Druid’s Kafka ingestion task monitors Kafka partitions, consumes data in order of arrival, and creates indexes internally.
  • Druid Query Layer: Druid provides very powerful query capability, can query via SQL, also supports multi-dimensional queries, aggregation queries, etc.
  • Kafka Consumer Offset Management: Druid uses Kafka consumer model, consumes messages in real-time and manages Offsets to ensure no data loss or duplicate ingestion.
  • Persistence and Data Storage: After ingestion and indexing, Druid periodically persists Segments to deep storage and merges/compresses old data.

Requirements Analysis

Scenario Analysis

  • Large data volume, need flexible queries based on business needs
  • High real-time requirement
  • Data pushed in real-time, need to analyze at second level and query results

Data Description

Original nested JSON data format:

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","products":
[{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"},{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}]}

Field description:

  • ts: Transaction time
  • orderId: Order number
  • userId: User ID
  • orderStatusId: Order status ID
  • orderStatus: Order status 0-11
  • payModeId: Payment method ID
  • payMode: Payment method
  • payment: Payment amount
  • products: Purchased products (nested structure)

Flattened Data Format

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"}}
{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}}

Kafka Producer

Scala Kafka Producer code example:

package icu.wzk.kafka

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import java.util.Properties
import scala.io.BufferedSource

object KafkaProducerForDruid {
  def main(args: Array[String]): Unit = {
    val brokers = "h121.wzk.icu:9092"
    val topic = "druid2"
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

    val producer = new KafkaProducer[String, String](prop);
    val source: BufferedSource = scala.io.Source.fromFile("orders1.json")
    val iter: Iterator[String] = source.getLines();
    iter.foreach {
      line => val msg = new ProducerRecord[String, String](topic, line);
        producer.send(msg)
        println(msg)
        Thread.sleep(10)
    }
    producer.close()
    source.close()
  }
}

Druid Import Data

Key steps:

  • Flatten JSON data
  • Don’t define RollUp
  • Set timestamp
  • Set “consume data from start of stream”

Query Calculation

Total Orders

-- Query total orders
SELECT COUNT(distinct orderId) as orderscount
FROM druid2

Total Users

-- Query total users
SELECT COUNT(distinct userId) as usercount
FROM druid2

Order Count by Status

-- Count orders by status
SELECT orderStatus, count(*)
FROM (
  SELECT orderId, orderStatus
  FROM druid2
  GROUP BY orderId, orderStatus
)
GROUP BY orderStatus

Order Count by Payment Method

-- Count orders by payment method
SELECT payMode, count(1)
FROM (
  SELECT orderId, payMode
  FROM druid2
  GROUP BY orderId, payMode
)
GROUP BY payMode

Top 10 Orders by Amount

-- Top 10 orders by amount
SELECT orderId, payment, count(1) as productcount, sum("product.productNum") as products
FROM druid2
GROUP BY orderId, payment

Case Summary

  • When configuring ingestion source, set to True to consume data from start of stream, otherwise may not find data in datasource
  • Druid’s JOIN capability is very limited, recommended for scenarios with many groupings or aggregations
  • SQL support capability is very limited
  • Data partition organization only has one way: time series

Error Quick Reference

SymptomRoot Cause LocationFix
Druid table no data/very little consumptionOffset starts from latest, historical not scannedSet useEarliestOffset: true in ingestion spec, rerun task
Time filter finds no datats unit/timezone mismatchCalibrate timestampSpec (format/timezone/unit)
Numeric column treated as stringJSON field type inconsistentUnify type in preprocessing; or set correct type in Druid schema
Count result abnormalRollup enabled/misusedTurn off Rollup analysis
Order count not equal to row countEach product split into rows causing duplicate orderIdCOUNT(DISTINCT orderId)
Ingestion task stuck PENDINGMiddleManager/Indexer resource slots insufficientExpand task slots or reduce concurrency
Kafka Lag continuously risingTask concurrency < partition countTask count ≥ partition count
Query timeoutSegments too fragmented/Historical node pressure highAdjust segmentGranularity, merge segments
Time column empty or all 1970Time parsing failedFix time format or unit (ms/seconds)
Too many dimensions causing slowHigh-cardinality dimensions not optimizedEnable index/dictionary compression for high-cardinality dimensions
SQL multi-value aggregation errorMulti-value column using unsupported aggregationUse array_length/UNNEST
Task frequent restartOffsets commit/group ID config improperFix consumerProperties and group ID