欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ELK日志分析系统(3)-logstash数据处理

程序员文章站 2022-03-12 19:31:38
logstash elk elk数据处理 logstash数据处理 ......

 

1. 概述

  logspout收集数据以后,就会把数据发送给logstash进行处理,本文主要讲解logstash的input, filter, output处理

2. input

  数据的输入处理

  支持tcp,udp等协议

 

  晚上找资料建议在使用 logstash::inputs::syslog 的时候走 tcp 协议来传输数据。

  因为具体实现中,udp 监听器只用了一个线程,而 tcp 监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。

  如果你已经在使用 udp 监听器收集日志,用下行命令检查你的 udp 接收队列大小:# netstat -plnu | awk 'nr==1 || $4~/:514$/{print $2}'

  recv-q

  228096

  228096 是 udp 接收队列的默认最大大小,这时候 linux 内核开始丢弃数据包了!

  2.1. 语法

  基本语法如下:

input{
        tcp {
                mode => "server"
                port => 5000
                codec => json_lines
                tags => ["data-http"]
        }
}

 

  2.2. multiline

  有时候日志是这样多行显示的:

[2019-10-12 15:24:50 account 97364 4658800064 info] http_ip=127.0.0.1        http_uri=/account/v1/binding        http_method=post        http_time=182ms        http_status=401        
http_headers=content-type:application/x-www-form-urlencoded
content-length:27
accept-encoding:identity
host:localhost:8800
user-agent:python-urllib/3.6
key:424518e4d27b11e8ada274e5f95979ae
version:1.1.0
time:1570865090.412524
token:y66ahlnmrosciisownkzxosojsg=
user-id:0
connection:close        
http_kwargs={'sns_type': 'wechat', 'code': 'cg9dej', 'user_id': 0, 'language': 1}        
http_response={"code":"usr_sns_code_error","message":"\u7b2c\u4e09\u65b9sns\u5e10\u53f7code\u65e0\u6548"}

 

    默认情况下logstash会把一行日志转换成elasticsearch的一个doc,上面这个日志就会存储成15条日志。这样就不能满足我们的需求,我们只是想要一条日志

  我们可以这么配置input:

input{
        tcp {
                port => 5001
                type => syslog
                tags => ["syslog"]
                codec=>multiline{
                        pattern => "\[%{timestamp_iso8601:timestamp}"
                        negate => true
                        what => "previous"
                }
        }
}

  红色代码的作用是:匹配到以[2019-10-08 16:57:42开头的一行日志作为previous,不是以这个格式开头的将作为子行出现,然后把多行记录合并成一行记录

 

3. filter

  数据的过滤转化处理

  3.1. 语法

  基本语法如下:

filter {
  grok {
    match => { "message" => "%{syslogbase} %{data:message}" }
    overwrite => [ "message" ]
  }
}

 

  3.2. grok范式匹配

  

  grok适合用来解析syslog,apache,mysql等日志

  假如你的日志格式是这样的

[2019-10-12 15:44:52 account 1 140058162291304 warning] hashcache::_rds_get, cache not existed!!! id_ls:[]

   日志的格式是这样的:

"[%(asctime)s %(service)s %(process)d %(thread)d %(levelname)s] %(message)s"

  那么针对这样有特定格式的日志,我们要怎样提取这里面的字段呢?

    可以这么配置你的filter:

filter{
        if [type] == "syslog" {
                grok {
                        match => { "message" => "\[%{timestamp_iso8601:timestamp} %{data:service} %{data:pid} %{data:tid} %{loglevel:log-level}\] %{greedydata:msg_body}" }
                }
        }
}

    使用grok的match正则表达式匹配可以方便的从message中提取字段

  从elasticsearch可以发现增加了timestamp、server、pid、tid和log-level等字段。

  ELK日志分析系统(3)-logstash数据处理

  

  附上官网文档:

    # grok调试器

     =>debugger

    # 官方文档

    

 

  3.3. gsub字符串替换

  经过logspout处理以后,会增加一些metadata(container name, container id, etc)

  红色部分是logspout添加的:

  <14>1 2019-10-08t18:00:15z zfswalk0 mage-device-11283 16901 - - [2019-10-09 09:49:08 warning saccount c p1 t140004171454120 p1 p2 p3] start listen on http:0.0.0.0:17698, start listen on http:0.0.0.0:17698

    如何去除这部分多余的数据呢?

  logstash需要使用gsub进行字符串替换:

  

filter{
        if [type] == "syslog" {
                mutate {
                        gsub => [ "message", "<\d+>.*?- -", "" ]
                }
        }
}

 

  这个正则表达式的意义是选择从“<14>”开始到“- -”结束的子字符串,然后替换成空字符串,实现metadata的删除

 

  3.4. remove_filed删除字段

  elk是采用json字典的方式来存储数据的

  如果你有哪些字段是不需要的,可以通过remove_field来删除

  假如你不想要grop解析出来的msg_body字段和test字段,可以这么操作,那么最后存储到elasticsearch那边将不会出现这2字段

filter{
        if [type] == "syslog" {
                mutate {
                        remove_field => [ "msg_body", "test" ]
                }
        }
}

 

 

3.5. kv过滤器解析kv数据

  官方文档kv filter:https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html

  动态的解析kv可以很方便的支持日志扩展,不需要后期去修改

 

  它会把这个字符串:ip=1.2.3.4 error=refused解析成kv字典形式:{"ip":"1.2.3.4", "error": "refused"}

  

filter{
        if [type] == "syslog" {
                kv {
                        source => "msg_body"
                        field_split => "\t\t"
                }
        }
}

    这边的配置意思是:从msg_body这个字段去解析kv字段,字段的分隔符是"\t\t"

  当然这也要求日志写入的时候需要采用"\t\t"来区分多个字段,类似这样:

[2019-10-12 15:24:50 account 97364 4658800064 info] http_ip=192.168.1.136		http_uri=/account/v1/binding		http_method=post

    http_ip=127.0.0.1、http_uri=/account/v1/binding与http_method=post这三个字段是采用'\t\t'分割的

  这样kv filter就会解析成功,并往doc里面设置http_ip, http_uri,http_method这三个值:

ELK日志分析系统(3)-logstash数据处理

 

 

4. output

  过滤转化后的数据的输出处理

  这里是把数据存储到elasticsearch的9200端口,index是"syslog-%{+yyyy.mm.dd}"

output{
    if "syslog" in [tags]{
        elasticsearch{
                hosts=>["elasticsearch:9200"]
                index => "syslog-%{+yyyy.mm.dd}"
        }
        stdout{codec => rubydebug}
    }
}

   然后elasticsearch就能得到数据了