Basic Introduction
Process of Apache Druid获取数据并进行分析 from Kafka:
- Kafka Data Flow Ingestion: Druid directly ingests real-time streaming data from Kafka through Kafka Indexing Service
- Data Parsing & Transformation: Supports JSON, Avro, CSV formats, can do field mapping, filtering and data transformation
- Real-time Data Ingestion & Indexing: Data goes into real-time index, generates inverted index and bitmap index
- Batch & Historical Data Merge: Supports real-time and batch hybrid mode
- Data Sharding & Replica Management: Supports horizontal scaling, ensures high availability
- Query & Analysis: Based on HTTP/JSON API, supports time series queries, grouped aggregation queries, etc.
- Visualization & Monitoring: Can integrate with BI tools like Superset, Tableau
Load Data from Kafka
Test Data Example
{"ts":"2020-10-01T00:01:35Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":1, "bytes":1000, "cost": 0.1}
{"ts":"2020-10-01T00:01:36Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":2, "bytes":2000, "cost": 0.1}
{"ts":"2020-10-01T00:01:37Z","srcip":"6.6.6.6", "dstip":"8.8.8.8", "srcport":6666,"dstPort":8888, "protocol": "tcp", "packets":3, "bytes":3000, "cost": 0.1}
Configuration Steps
-
Start Kafka:
kafka-server-start.sh /opt/servers/kafka_2.12-2.7.2/config/server.properties -
Create Topic:
kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partitions 1 --topic druid1 -
Data Ingestion Process:
- LoadData → Streaming → Kafka
- Configure Parser, select JSON format, Parser Time (timestamp maps to __time)
- Configure timestamp
- Configuration Schema configures dimensions and metrics:
- Dimensions: srcip, dstip, srcport, dstport, protocol
- Metrics: count, max(packets), min(bytes), sum(cost)
- Partition config: uniform daily partitioning
- Submit to create Supervisor
Data Query
SELECT * FROM "druid1"
Error Quick Reference
| Symptom | Root Cause | Solution |
|---|---|---|
| Parse preview empty/all null | JSON field names inconsistent with Schema | Use column flattening/transform for alias mapping |
| Supervisor running but Datasource doesn’t appear | Kafka has no data or offset start point wrong | Set useEarliestOffset=true or reset offset |
| SQL returns 0 rows | Time parsing error or outside query range | Confirm ts is ISO8601 format with Z |
| Cannot parse timestamp | ts format non-compliant | Unify time format, specify format |
| Unknown field in aggregators | Aggregator doesn’t match column type | Use longSum/doubleSum/max/min to match actual type |
| Low traffic but generates many segments | Secondary threshold too small | Increase maxRowsPerSegment |