TL;DR
- 场景: 电商点击/订单流入 Kafka,Druid 进行实时分析与可视化。
- 结论: 嵌套 JSON 需预处理拉平;Kafka Indexing Service 设”从最早位点”与禁用 Rollup 更稳。
- 产出: Scala Producer→Druid 摄取→SQL 指标(订单/用户/支付/TopN)跑通与常见坑位清单。
整体流程
- Kafka 数据源: Kafka 是一个分布式流处理平台,负责接收、存储并传输数据。它支持从各类应用、日志、传感器等设备采集实时数据,将数据划分为多个主题(Topic),并将消息分发给消费者。
- Kafka Producer: 数据生产者(Producer)负责将数据发送到 Kafka 的主题中。
- Druid Kafka Ingestion: Druid 提供了对 Kafka 的原生支持。通过 Kafka Indexing Service,Druid 可以持续从 Kafka 的某个主题中消费数据,实时地将这些数据摄取到 Druid 中。
- 实时数据摄取和索引: Druid 的 Kafka 摄取任务会监听 Kafka 的分区,按照流数据的到达顺序消费数据,并在内部创建索引。
- Druid 查询层: Druid 提供了非常强大的查询能力,可以通过 SQL 查询方式进行交互,也支持多维查询、聚合查询等。
- Kafka 消费者 Offset 管理: Druid 使用 Kafka 消费者模型,实时消费消息并管理 Offset(偏移量),确保数据不丢失或重复摄取。
- 持久化和数据存储: 数据在经过摄取和索引后,Druid 会定期将数据段(Segment)持久化到深度存储中,并对旧数据进行合并和压缩。
需求分析
场景分析
- 数据量大,需要在这些数据中根据业务需要灵活查询
- 实时性要求高
- 数据实时的推过来,要在秒级对数据进行分析并查询出结果
数据描述
原始嵌套 JSON 数据格式:
{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","products":
[{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"},{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}]}
字段说明:
- ts: 交易时间
- orderId: 订单编号
- userId: 用户id
- orderStatusId: 订单状态Id
- orderStatus: 订单状态 0-11
- payModeId: 支付方式id
- payMode: 支付方式
- payment: 支付金额
- products: 购买商品(嵌套结构)
拉平后的数据格式
{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"}}
{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}}
Kafka生产者
Scala Kafka Producer 代码示例:
package icu.wzk.kafka
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties
import scala.io.BufferedSource
object KafkaProducerForDruid {
def main(args: Array[String]): Unit = {
val brokers = "h121.wzk.icu:9092"
val topic = "druid2"
val prop = new Properties()
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
val producer = new KafkaProducer[String, String](prop);
val source: BufferedSource = scala.io.Source.fromFile("orders1.json")
val iter: Iterator[String] = source.getLines();
iter.foreach {
line => val msg = new ProducerRecord[String, String](topic, line);
producer.send(msg)
println(msg)
Thread.sleep(10)
}
producer.close()
source.close()
}
}
Druid导入数据
关键步骤:
- JSON数据要拉平
- 不定义 RollUp
- 设置时间戳
- 设置”从流的开始进行消费数据”
查询计算
订单总数
-- 查询订单总数
SELECT COUNT(distinct orderId) as orderscount
FROM druid2
用户总数
-- 查询用户总数
SELECT COUNT(distinct userId) as usercount
FROM druid2
统计结果状态订单数
-- 统计各种订单状态的订单数
SELECT orderStatus, count(*)
FROM (
SELECT orderId, orderStatus
FROM druid2
GROUP BY orderId, orderStatus
)
GROUP BY orderStatus
统计各种支付方式的订单数
-- 统计各种支付方式订单数
SELECT payMode, count(1)
FROM (
SELECT orderId, payMode
FROM druid2
GROUP BY orderId, payMode
)
GROUP BY payMode
订单金额最大的前10名
-- 订单金额最大的前10名
SELECT orderId, payment, count(1) as productcount, sum("product.productNum") as products
FROM druid2
GROUP BY orderId, payment
案例小节
- 在配置摄入源时要设置为True从流的开始进行消费数据,否则在数据源中可能查不到数据
- Druid的JOIN能力非常有限,分组或者聚合多的场景推荐使用
- SQL支持能力非常受限
- 数据的分区组织只有时间序列一种方式
错误速查
| 症状 | 根因定位 | 修复 |
|---|---|---|
| Druid 表无数据/很少消费 | 位点从 latest 开始,历史未扫 | 在 ingestion spec 设 useEarliestOffset: true,重跑任务 |
| 时间筛选查不到数据 | ts 单位/时区不符 | 校正 timestampSpec(格式/时区/单位) |
| 数值列被当作字符串 | JSON 字段类型不一致 | 在预处理统一类型;或 Druid schema 中设正确类型 |
| 计数结果异常 | 启用/误用 Rollup | 分析关闭 Rollup |
| 订单数与行数不等 | 每个 product 拆行导致重复 orderId | COUNT(DISTINCT orderId) |
| 摄取任务卡住 PENDING | MiddleManager/Indexer 资源槽不足 | 扩容 task slots 或降低并发 |
| Kafka Lag 不断升高 | 任务并发 < 分区数 | 任务数≥分区数 |
| 查询超时 | Segment 过碎/历史节点压力大 | 调整 segmentGranularity、合并段 |
| 时间列为空或全 1970 | 时间解析失败 | 修正时间格式或单位(ms/秒) |
| 维度过多导致慢 | 高基数维度未优化 | 为高基数维度启用索引/字典压缩 |
| SQL 报多值聚合错误 | 多值列用不支持的聚合 | 使用 array_length/UNNEST |
| 任务频繁重启 | Offsets 提交/组 ID 配置不当 | 固定 consumerProperties 与组 ID |