大数据-190 Filebeat → Kafka → Logstash → Elasticsearch 实战
1. 整体架构
Filebeat → Kafka → Logstash → Elasticsearch → Kibana
2. Filebeat 配置
2.1 安装
2.2 filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
json.keys_under_root: true
json.overwrite_keys: true
output.kafka:
hosts: ["h121:9092", "h122:9092", "h123:9092"]
topic: "nginx-log"
partition.hash:
hash: []
3. Logstash 配置
3.1 从 Kafka 消费
input {
kafka {
bootstrap_servers => "h121:9092,h122:9092,h123:9092"
topics => ["nginx-log"]
group_id => "logstash-group"
}
}
3.2 解析 JSON
filter {
if [type] == "app" {
json {
source => "message"
target => "parsed"
}
}
}
3.3 GeoIP 增强
filter {
geoip {
source => "remote_addr"
database => "GeoLite2-City.mmdb"
}
}
3.4 输出到 ES
output {
elasticsearch {
hosts => ["h121:9200", "h122:9200", "h123:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
}
}
4. 完整配置示例
4.1 filebeat.yml(简化版)
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
output.kafka:
hosts: ["h121:9092"]
topic: "nginx-log"
4.2 logstash_kafka_es.conf
input {
kafka {
bootstrap_servers => "h121:9092"
topics => ["nginx-log"]
}
}
filter {
geoip {
source => "remote_addr"
}
}
output {
elasticsearch {
hosts => ["h121:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
5. 错误速查表
| 问题 | 可能原因 | 解决方案 |
|---|
| Filebeat 启动失败 | 配置文件错误 | 检查 yml 语法 |
| Kafka 消费不到数据 | Topic 不存在 | 创建 Topic |
| GeoIP 不生效 | IP 格式不对 | 检查 remote_addr 字段 |
| 索引无数据 | ES 写入失败 | 检查 ES 状态 |
6. 总结
- Filebeat 采集日志发送到 Kafka
- Logstash 从 Kafka 消费
- 使用 geoip 插件添加地理信息
- 数据写入 ES 供 Kibana 展示