TL;DR

  • 场景: 电商点击/订单流入 Kafka,Druid 进行实时分析与可视化。
  • 结论: 嵌套 JSON 需预处理拉平;Kafka Indexing Service 设”从最早位点”与禁用 Rollup 更稳。
  • 产出: Scala Producer→Druid 摄取→SQL 指标(订单/用户/支付/TopN)跑通与常见坑位清单。

整体流程

  • Kafka 数据源: Kafka 是一个分布式流处理平台,负责接收、存储并传输数据。它支持从各类应用、日志、传感器等设备采集实时数据,将数据划分为多个主题(Topic),并将消息分发给消费者。
  • Kafka Producer: 数据生产者(Producer)负责将数据发送到 Kafka 的主题中。
  • Druid Kafka Ingestion: Druid 提供了对 Kafka 的原生支持。通过 Kafka Indexing Service,Druid 可以持续从 Kafka 的某个主题中消费数据,实时地将这些数据摄取到 Druid 中。
  • 实时数据摄取和索引: Druid 的 Kafka 摄取任务会监听 Kafka 的分区,按照流数据的到达顺序消费数据,并在内部创建索引。
  • Druid 查询层: Druid 提供了非常强大的查询能力,可以通过 SQL 查询方式进行交互,也支持多维查询、聚合查询等。
  • Kafka 消费者 Offset 管理: Druid 使用 Kafka 消费者模型,实时消费消息并管理 Offset(偏移量),确保数据不丢失或重复摄取。
  • 持久化和数据存储: 数据在经过摄取和索引后,Druid 会定期将数据段(Segment)持久化到深度存储中,并对旧数据进行合并和压缩。

需求分析

场景分析

  • 数据量大,需要在这些数据中根据业务需要灵活查询
  • 实时性要求高
  • 数据实时的推过来,要在秒级对数据进行分析并查询出结果

数据描述

原始嵌套 JSON 数据格式:

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","products":
[{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"},{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}]}

字段说明:

  • ts: 交易时间
  • orderId: 订单编号
  • userId: 用户id
  • orderStatusId: 订单状态Id
  • orderStatus: 订单状态 0-11
  • payModeId: 支付方式id
  • payMode: 支付方式
  • payment: 支付金额
  • products: 购买商品(嵌套结构)

拉平后的数据格式

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"}}
{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}}

Kafka生产者

Scala Kafka Producer 代码示例:

package icu.wzk.kafka

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import java.util.Properties
import scala.io.BufferedSource

object KafkaProducerForDruid {
  def main(args: Array[String]): Unit = {
    val brokers = "h121.wzk.icu:9092"
    val topic = "druid2"
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

    val producer = new KafkaProducer[String, String](prop);
    val source: BufferedSource = scala.io.Source.fromFile("orders1.json")
    val iter: Iterator[String] = source.getLines();
    iter.foreach {
      line => val msg = new ProducerRecord[String, String](topic, line);
        producer.send(msg)
        println(msg)
        Thread.sleep(10)
    }
    producer.close()
    source.close()
  }
}

Druid导入数据

关键步骤:

  • JSON数据要拉平
  • 不定义 RollUp
  • 设置时间戳
  • 设置”从流的开始进行消费数据”

查询计算

订单总数

-- 查询订单总数
SELECT COUNT(distinct orderId) as orderscount
FROM druid2

用户总数

-- 查询用户总数
SELECT COUNT(distinct userId) as usercount
FROM druid2

统计结果状态订单数

-- 统计各种订单状态的订单数
SELECT orderStatus, count(*)
FROM (
  SELECT orderId, orderStatus
  FROM druid2
  GROUP BY orderId, orderStatus
)
GROUP BY orderStatus

统计各种支付方式的订单数

-- 统计各种支付方式订单数
SELECT payMode, count(1)
FROM (
  SELECT orderId, payMode
  FROM druid2
  GROUP BY orderId, payMode
)
GROUP BY payMode

订单金额最大的前10名

-- 订单金额最大的前10名
SELECT orderId, payment, count(1) as productcount, sum("product.productNum") as products
FROM druid2
GROUP BY orderId, payment

案例小节

  • 在配置摄入源时要设置为True从流的开始进行消费数据,否则在数据源中可能查不到数据
  • Druid的JOIN能力非常有限,分组或者聚合多的场景推荐使用
  • SQL支持能力非常受限
  • 数据的分区组织只有时间序列一种方式

错误速查

症状根因定位修复
Druid 表无数据/很少消费位点从 latest 开始,历史未扫在 ingestion spec 设 useEarliestOffset: true,重跑任务
时间筛选查不到数据ts 单位/时区不符校正 timestampSpec(格式/时区/单位)
数值列被当作字符串JSON 字段类型不一致在预处理统一类型;或 Druid schema 中设正确类型
计数结果异常启用/误用 Rollup分析关闭 Rollup
订单数与行数不等每个 product 拆行导致重复 orderIdCOUNT(DISTINCT orderId)
摄取任务卡住 PENDINGMiddleManager/Indexer 资源槽不足扩容 task slots 或降低并发
Kafka Lag 不断升高任务并发 < 分区数任务数≥分区数
查询超时Segment 过碎/历史节点压力大调整 segmentGranularity、合并段
时间列为空或全 1970时间解析失败修正时间格式或单位(ms/秒)
维度过多导致慢高基数维度未优化为高基数维度启用索引/字典压缩
SQL 报多值聚合错误多值列用不支持的聚合使用 array_length/UNNEST
任务频繁重启Offsets 提交/组 ID 配置不当固定 consumerProperties 与组 ID