基本介绍

Apache Druid从Kafka中获取数据并进行分析的流程包括:

  1. Kafka数据流接入: Druid通过Kafka Indexing Service直接从Kafka中摄取实时流数据
  2. 数据解析与转换: 支持JSON、Avro、CSV格式,可进行字段映射、过滤和数据转换
  3. 实时数据摄取与索引: 数据放入实时索引,生成倒排索引和bitmap索引
  4. 批处理与历史数据合并: 支持实时和批处理混合模式
  5. 数据分片与副本管理: 支持水平扩展,保证高可用性
  6. 查询与分析: 基于HTTP/JSON API,支持时间序列查询、分组聚合查询等
  7. 可视化与监控: 可与Superset、Tableau等BI工具集成

从Kafka中加载数据

测试数据示例

{"ts":"2020-10-01T00:01:35Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":1, "bytes":1000, "cost": 0.1}
{"ts":"2020-10-01T00:01:36Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":2, "bytes":2000, "cost": 0.1}
{"ts":"2020-10-01T00:01:37Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":3, "bytes":3000, "cost": 0.1}

配置步骤

  1. 启动Kafka:

    kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties
  2. 创建Topic:

    kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partitions 1 --topic druid1
  3. 数据摄取流程:

    • LoadData → Streaming → Kafka
    • 配置Parser选择JSON格式Parser Time(时间戳映射到__
    • 配置time)
    • Configuration Schema配置维度与指标:
      • 维度: srcip, dstip, srcport, dstport, protocol
      • 指标: count, max(packets), min(bytes), sum(cost)
    • Partition配置: uniform按日分区
    • Submit创建Supervisor

数据查询

SELECT * FROM "druid1"

错误速查表

症状根因解决方案
Parse预览为空/全nullJSON字段名与Schema不一致使用column flattening/transform做别名映射
Supervisor运行但Datasource不出现Kafka无数据或偏移起点不对设置useEarliestOffset=true或重置位点
SQL返回0行时间解析错误或超出查询区间确认ts为ISO8601格式且带Z
Cannot parse timestampts格式不合规统一时间格式,指定format
Unknown field in aggregators聚合器与列类型不匹配使用longSum/doubleSum/max/min等匹配实际类型
小流量却产生大量segmentsSecondary阈值过小调大maxRowsPerSegment