欢迎光临
一直在努力

ELK logstash 从aws的s3获取slb访问日志

aws SLB负载均衡器默认日志是可以保存在s3上的,那么我们在s3存了很多日志之后,如何分析呢,下面提供 使用ELK 从s3提取日志的logstash配置. 然后导入到Elasticsearch

日志格式

如图

logstash conf

  • 日志路径

api/AWSLogs/430045985948/elasticloadbalancing/ap-southeast-1/2019/04/28/

s3.conf

  #--[ INPUT ]----------------------------------------------------------------
input
{

  # Logs ELB API
#  s3 {
#    region => "ap-southeast-1"
#    bucket => "***"
#    prefix => "AWSLogs/430045985948/elasticloadbalancing/ap-southeast-1/%{+YYYY}/%{+MM}/%{+dd}/"
#    aws_credentials_file => "/data/server/logstash/config/root.yml"
#    interval => "30"
#    type => "elb_access_log"
#  }
    s3 {
        "access_key_id" => "***"
        "secret_access_key" => "*****"
        "type" => "elb_access_log"
        "region" => "ap-southeast-1"
#"aws_credentials_file" => "/data/server/logstash/config/root.yml"
        "bucket" => "*****"
        #跟踪上次处理的文件被添加到S3的日期 设置必须是文件名路径,而不仅仅是目录
        "sincedb_path" => "/data/EL/logstash/s3/s3.txt"
        "additional_settings" => {
              "force_path_style" => true
              "follow_redirects" => false
            }
      }

}



#--[ FILTER ]---------------------------------------------------------------
filter
{
    # Set the HTTP request time to @timestamp field
    date {
      match => [ "timestamp", "ISO8601" ]
      remove_field => [ "timestamp" ]
    }


  # Parse the ELB access logs
  if [type] == "elb_access_log"{
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp:date} %{HOSTNAME:loadbalancer} %{IP:client_ip}:%{POSINT:client_port:int} (?:%{IP:backend_ip}:%{POSINT:backend_port:int}|-) %{NUMBER:request_processing_time:float} %{NUMBER:backend_processing_time:float} %{NUMBER:response_processing_time:float} %{INT:backend_status_code:int} %{INT:received_bytes:int} %{INT:sent_bytes:int} %{INT:sent_bytes_ack:int} \"%{WORD:http_method} %{URI:url_asked} HTTP/%{NUMBER:http_version}\" \"%{GREEDYDATA:user_agent}\" %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol}" ]
      remove_field => [ "message" ]
    }
    kv {
      field_split => "&?"
      source => "url_asked"
    }
  }
    if [url_asked] !~ "v2" {
        drop {}
    }
    if [url_asked] =~ "xiaomi" {
        drop {}
    }


#    date {
#      match => [ "timestamp", "ISO8601" ]
#      remove_field => [ "timestamp" ]
#    }
#
#
#
#  # Remove field tags if empty
#  if [tags] == [] {
#    mutate {
#      remove_field => [ "tags" ]
#    }
#  }

  # Remove some unnecessary fields to make Kibana cleaner
  mutate {
    remove_field => [ "@version", "count", "fields", "input_type", "offset", "[beat][hostname]", "[beat][name]", "[beat]" ]
  }

}

#--[ OUTPUT ]---------------------------------------------------------------
output{
    if [type] == "elb_access_log"{
        elasticsearch {
        hosts => ["172.31.3.47:9200"]
        index => "s3"
        #将"_id"的值设为mysql的id列
        #document_id => "%{id}"
        #document_type => "base"
        }
    }
}

调用

logstash -f s3.conf

赞(0) 打赏

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏