1. 整体架构
Filebeat → Kafka → Logstash → Elasticsearch → Kibana
2. Filebeat 配置
2.1 安装
2.2 filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
json.keys_under_root: true
json.overwrite_keys: true
output.kafka:
hosts: ["h121:9092", "h122:9092", "h123:9092"]
topic: "nginx-log"
partition.hash:
hash: []
3. Logstash 配置
3.1 从 Kafka 消费
input {
kafka {
bootstrap_servers => "h121:9092,h122:9092,h123:9092"
topics => ["nginx-log"]
group_id => "logstash-group"
}
}
3.2 解析 JSON
filter {
if [type] == "app" {
json {
source => "message"
target => "parsed"
}
}
}
3.3 GeoIP 增强
filter {
geoip {
source => "remote_addr"
database => "GeoLite2-City.mmdb"
}
}
3.4 输出到 ES
output {
elasticsearch {
hosts => ["h121:9200", "h122:9200", "h123:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
}
}
4. 完整配置示例
4.1 filebeat.yml(简化版)
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
output.kafka:
hosts: ["h121:9092"]
topic: "nginx-log"
4.2 logstash_kafka_es.conf
input {
kafka {
bootstrap_servers => "h121:9092"
topics => ["nginx-log"]
}
}
filter {
geoip {
source => "remote_addr"
}
}
output {
elasticsearch {
hosts => ["h121:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
5. 错误速查表
6. 总结
- Filebeat 采集日志发送到 Kafka
- Logstash 从 Kafka 消费
- 使用 geoip 插件添加地理信息
- 数据写入 ES 供 Kibana 展示