欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

ELK 经典用法—企业自定义日志收集切割和mysql模块

程序员文章站 2022-07-02 17:31:08
本文收录在Linux运维企业架构实战系列 一、收集切割公司自定义的日志 很多公司的日志并不是和服务默认的日志格式一致,因此,就需要我们来进行切割了。 1、需切割的日志示例 2018-02-24 11:19:23,532 [143] DEBUG performanceTrace 1145 http:/ ......

本文收录在Linux运维企业架构实战系列

一、收集切割公司自定义的日志

很多公司的日志并不是和服务默认的日志格式一致,因此,就需要我们来进行切割了。

1、需切割的日志示例

2018-02-24 11:19:23,532 [143] DEBUG performanceTrace 1145 http://api.114995.com:8082/api/Carpool/QueryMatchRoutes 183.205.134.240 null 972533 310000 TITTL00 HUAWEI 860485038452951 3.1.146 HUAWEI 5.1 113.552344 33.332737 发送响应完成 Exception:(null)

 

2、切割的配置

在logstash 上,使用fifter 的grok 插件进行切割

input {
        beats {
                port => "5044"
        }
}

filter {
    grok {
        match => {
            "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{NUMBER:thread:int}\] %{DATA:level} (?<logger>[a-zA-Z]+) %{NUMBER:executeTime:int} %{URI:url} %{IP:clientip} %{USERNAME:UserName} %{NUMBER:userid:int} %{NUMBER:AreaCode:int} (?<Board>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) (?<Brand>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) %{NUMBER:DeviceId:int} (?<TerminalSourceVersion>[0-9a-z\.]+) %{NUMBER:Sdk:float} %{NUMBER:Lng:float} %{NUMBER:Lat:float} (?<Exception>.*)"
        }
        remove_field => "message"
    }
    date {
                   match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
        remove_field => "timestamp"
           }
    geoip {
        source => "clientip"
        target => "geoip"
        database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"
    }
}

output {
    elasticsearch {
        hosts => ["http://192.168.10.101:9200/"]
        index => "logstash-%{+YYYY.MM.dd}"
        document_type => "apache_logs"
    }
}

 

3、切割解析后效果

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

4、最终kibana 展示效果

① top10 clientip

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

② top5 url

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

③ 根据ip 显示地理位置

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

⑤ top10 executeTime

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

⑥ 其他字段都可进行设置,多种图案,也可将多个图形放在一起展示

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

 

二、grok 用法详解

1、简介

  Grok是迄今为止使蹩脚的、无结构的日志结构化和可查询的最好方式。Grok在解析 syslog logs、apache and other webserver logs、mysql logs等任意格式的文件上表现完美。

  Grok内置了120多种的正则表达式库,地址:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。

 

2、入门例子

① 示例

55.3.244.1 GET /index.html 15824 0.043

 

② 分析

  这条日志可切分为5个部分,IP(55.3.244.1)、方法(GET)、请求文件路径(/index.html)、字节数(15824)、访问时长(0.043),对这条日志的解析模式(正则表达式匹配)如下:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

 

③ 写到filter中

filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }

 

④ 解析后效果

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

 

3、解析任意格式日志

(1)解析任意格式日志的步骤:

① 先确定日志的切分原则,也就是一条日志切分成几个部分。

② 对每一块进行分析,如果Grok中正则满足需求,直接拿来用。如果Grok中没用现成的,采用自定义模式。

③ 学会在Grok Debugger中调试。

 

(2)grok 的分类

  • 满足自带的grok 正则 grok_pattern

① 可以查询

# less /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.1/patterns/grok-patterns

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

② 使用格式

grok_pattern 由零个或多个 %{SYNTAX:SEMANTIC}组成

例: %{IP:clientip}

  其中SYNTAX 是表达式的名字,是由grok提供的:例如数字表达式的名字是NUMBER,IP地址表达式的名字是IP

  SEMANTIC 表示解析出来的这个字符的名字,由自己定义,例如IP字段的名字可以是 client

 

  • 自定义SYNTAX

使用格式:(?<field_name>the pattern here)

例:(?<Board>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+)

 

(3)正则解析容易出错,强烈建议使用Grok Debugger调试,姿势如下(我打开这个网页不能用)

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

三、使用mysql 模块,收集mysql 日志

1、官方文档使用介绍

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-module-mysql.html

 

2、配置filebeat ,使用mysql 模块收集mysql 的慢查询

# vim filebeat.yml

#=========================== Filebeat prospectors =============================
filebeat.modules:
- module: mysql
  error:
    enabled: true
    var.paths: ["/var/log/mariadb/mariadb.log"]

  slowlog:
    enabled: true
    var.paths: ["/var/log/mariadb/mysql-slow.log"]
#----------------------------- Redis output --------------------------------
output.redis:
  hosts: ["192.168.10.102"]
  password: "ilinux.io"
  key: "httpdlogs"
  datatype: "list"
  db: 0
  timeout: 5

 

3、elk—logstash 切割mysql 的慢查询日志

① 切割配置

# vim mysqllogs.conf

input {
        redis {
                host => "192.168.10.102"
                port => "6379"
                password => "ilinux.io"
                data_type => "list"
                key => "httpdlogs"
                threads => 2
        }
}

filter {
        grok {
                match => { "message" => "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IPV4:clientip})?\]\s+Id:\s+%{NUMBER:row_id:int}\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\n\s*(?:use %{DATA:database};\s*\n)?SET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<sql>(?<action>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$" }
        }
        date {
                match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                remove_field => "timestamp"
        }
}

output {
        elasticsearch {
                hosts => ["http://192.168.10.101:9200/"]
                index => "logstash-%{+YYYY.MM.dd}"
                document_type => "mysql_logs"
        }
} 

② 切割后显示结果

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

 

4、kibana 最终显示效果

① 哪几个的数据库最多,例:top2 库

表无法显示,因为有些语句不涉及表,切割不出来

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

② 哪几个sql语句出现的最多,例:top5 sql语句

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

③ 哪几个sql语句出现的最多,例:top5 sql语句

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

④ 哪几台服务器慢查询日志生成的最多,例:top5 服务器

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

⑤ 哪几个用户慢查询日志生成的最多,例:top2 用户

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

可以合并显示

ELK 经典用法—企业自定义日志收集切割和mysql模块

 

5、使用mysql 模块收集mysql 的慢查询

(1)filebeat 配置和上边一样

 

(2)elk—logstash 切割mysql 的错误日志

# vim mysqllogs.conf

filter {
        grok {
                match => { "message" => "(?<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}) %{NUMBER:pid:int} \[%{DATA:level}\] (?<content>.*)" }
        }
        date {
                match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                remove_field => "timestamp"
        }
}

 

(3)就不在展示结果了

 

四、ELK 收集多实例日志

很多情况下,公司资金不足,不会一对一收集日志;因此,一台logstash 使用多实例收集处理多台agent 的日志很有必要。

1、filebeat 的配置

主要是output 的配置,只需不同agent 指向不同的端口即可

① agent 1 配置指向5044 端口

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.10.107:5044"]

② agent 2 配置指向5045 端口

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.10.107:5045"]

 

2、logstash 的配置

针对不同的agent ,input 指定对应的端口

① agent 1

input {
        beats {
                port => "5044"
        }
}
output {   #可以在output 加以区分
        elasticsearch {
                hosts => ["http://192.168.10.107:9200/"]
                index => "logstash-apache1-%{+YYYY.MM.dd}"
                document_type => "apache1_logs"
        }
}

② agent 1

input {
        beats {
                port => "5045"
        }
}
output {   #可以在output 加以区分
        elasticsearch {
                hosts => ["http://192.168.10.107:9200/"]
                index => "logstash-apache2-%{+YYYY.MM.dd}"
                document_type => "apache2_logs"
        }
}

开启对应的服务就ok 了。