大数据-190 Filebeat → Kafka → Logstash → Elasticsearch 实战

1. 整体架构

Filebeat → Kafka → Logstash → Elasticsearch → Kibana

2. Filebeat 配置

2.1 安装

  • 7.3.0 或 7.17.0

2.2 filebeat.yml

filebeat.inputs:
  - type: log
    paths:
      - /var/log/nginx/access.log
    json.keys_under_root: true
    json.overwrite_keys: true

output.kafka:
  hosts: ["h121:9092", "h122:9092", "h123:9092"]
  topic: "nginx-log"
  partition.hash:
    hash: []

3. Logstash 配置

3.1 从 Kafka 消费

input {
  kafka {
    bootstrap_servers => "h121:9092,h122:9092,h123:9092"
    topics => ["nginx-log"]
    group_id => "logstash-group"
  }
}

3.2 解析 JSON

filter {
  if [type] == "app" {
    json {
      source => "message"
      target => "parsed"
    }
  }
}

3.3 GeoIP 增强

filter {
  geoip {
    source => "remote_addr"
    database => "GeoLite2-City.mmdb"
  }
}

3.4 输出到 ES

output {
  elasticsearch {
    hosts => ["h121:9200", "h122:9200", "h123:9200"]
    index => "logstash-%{type}-%{+YYYY.MM.dd}"
  }
}

4. 完整配置示例

4.1 filebeat.yml(简化版)

filebeat.inputs:
  - type: log
    paths:
      - /var/log/nginx/access.log

output.kafka:
  hosts: ["h121:9092"]
  topic: "nginx-log"

4.2 logstash_kafka_es.conf

input {
  kafka {
    bootstrap_servers => "h121:9092"
    topics => ["nginx-log"]
  }
}

filter {
  geoip {
    source => "remote_addr"
  }
}

output {
  elasticsearch {
    hosts => ["h121:9200"]
    index => "logstash-%{+YYYY.MM.dd}"
  }
}

5. 错误速查表

问题可能原因解决方案
Filebeat 启动失败配置文件错误检查 yml 语法
Kafka 消费不到数据Topic 不存在创建 Topic
GeoIP 不生效IP 格式不对检查 remote_addr 字段
索引无数据ES 写入失败检查 ES 状态

6. 总结

  • Filebeat 采集日志发送到 Kafka
  • Logstash 从 Kafka 消费
  • 使用 geoip 插件添加地理信息
  • 数据写入 ES 供 Kibana 展示