基本介绍
Apache Druid从Kafka中获取数据并进行分析的流程包括:
- Kafka数据流接入: Druid通过Kafka Indexing Service直接从Kafka中摄取实时流数据
- 数据解析与转换: 支持JSON、Avro、CSV格式,可进行字段映射、过滤和数据转换
- 实时数据摄取与索引: 数据放入实时索引,生成倒排索引和bitmap索引
- 批处理与历史数据合并: 支持实时和批处理混合模式
- 数据分片与副本管理: 支持水平扩展,保证高可用性
- 查询与分析: 基于HTTP/JSON API,支持时间序列查询、分组聚合查询等
- 可视化与监控: 可与Superset、Tableau等BI工具集成
从Kafka中加载数据
测试数据示例
{"ts":"2020-10-01T00:01:35Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":1, "bytes":1000, "cost": 0.1}
{"ts":"2020-10-01T00:01:36Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":2, "bytes":2000, "cost": 0.1}
{"ts":"2020-10-01T00:01:37Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":3, "bytes":3000, "cost": 0.1}
配置步骤
-
启动Kafka:
kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties -
创建Topic:
kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partitions 1 --topic druid1 -
数据摄取流程:
- LoadData → Streaming → Kafka
- 配置Parser选择JSON格式Parser Time(时间戳映射到__
- 配置time)
- Configuration Schema配置维度与指标:
- 维度: srcip, dstip, srcport, dstport, protocol
- 指标: count, max(packets), min(bytes), sum(cost)
- Partition配置: uniform按日分区
- Submit创建Supervisor
数据查询
SELECT * FROM "druid1"
错误速查表
| 症状 | 根因 | 解决方案 |
|---|---|---|
| Parse预览为空/全null | JSON字段名与Schema不一致 | 使用column flattening/transform做别名映射 |
| Supervisor运行但Datasource不出现 | Kafka无数据或偏移起点不对 | 设置useEarliestOffset=true或重置位点 |
| SQL返回0行 | 时间解析错误或超出查询区间 | 确认ts为ISO8601格式且带Z |
| Cannot parse timestamp | ts格式不合规 | 统一时间格式,指定format |
| Unknown field in aggregators | 聚合器与列类型不匹配 | 使用longSum/doubleSum/max/min等匹配实际类型 |
| 小流量却产生大量segments | Secondary阈值过小 | 调大maxRowsPerSegment |