Big Data 190 - Filebeat → Kafka → Logstash → Elasticsearch Practice

1. Overall Architecture

Filebeat → Kafka → Logstash → Elasticsearch → Kibana

2. Filebeat Configuration

2.1 Installation

  • 7.3.0 or 7.17.0

2.2 filebeat.yml

filebeat.inputs:
  - type: log
    paths:
      - /var/log/nginx/access.log
    json.keys_under_root: true
    json.overwrite_keys: true

output.kafka:
  hosts: ["h121:9092", "h122:9092", "h123:9092"]
  topic: "nginx-log"
  partition.hash:
    hash: []

3. Logstash Configuration

3.1 Consume from Kafka

input {
  kafka {
    bootstrap_servers => "h121:9092,h122:9092,h123:9092"
    topics => ["nginx-log"]
    group_id => "logstash-group"
  }
}

3.2 Parse JSON

filter {
  if [type] == "app" {
    json {
      source => "message"
      target => "parsed"
    }
  }
}

3.3 GeoIP Enhancement

filter {
  geoip {
    source => "remote_addr"
    database => "GeoLite2-City.mmdb"
  }
}

3.4 Output to ES

output {
  elasticsearch {
    hosts => ["h121:9200", "h122:9200", "h123:9200"]
    index => "logstash-%{type}-%{+YYYY.MM.dd}"
  }
}

4. Complete Configuration Example

4.1 filebeat.yml (Simplified)

filebeat.inputs:
  - type: log
    paths:
      - /var/log/nginx/access.log

output.kafka:
  hosts: ["h121:9092"]
  topic: "nginx-log"

4.2 logstash_kafka_es.conf

input {
  kafka {
    bootstrap_servers => "h121:9092"
    topics => ["nginx-log"]
  }
}

filter {
  geoip {
    source => "remote_addr"
  }
}

output {
  elasticsearch {
    hosts => ["h121:9200"]
    index => "logstash-%{+YYYY.MM.dd}"
  }
}

5. Error Quick Reference

IssuePossible CauseSolution
Filebeat fails to startConfig file errorCheck yml syntax
Kafka can’t consume dataTopic doesn’t existCreate Topic
GeoIP not effectiveIP format incorrectCheck remote_addr field
Index has no dataES write failedCheck ES status

6. Summary

  • Filebeat collects logs and sends to Kafka
  • Logstash consumes from Kafka
  • Use geoip plugin to add geographic information
  • Write data to ES for Kibana display