欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

165. ELK应用之Logstash 数据处理

程序员文章站 2024-01-20 16:23:04
...

1. Logstash

Logstash 是开源的数据处理管道,能够同时出来从多个源采集数据,转换数据,然后输出数据

2. Logstash架构介绍

Logstash 的基础架构类型pipeline流水线
	input: 数据采集(常用插件:stdin,file,kafka,beat,http)
	Fileter:数据解析/转换(常用插件:grok,date,geoip,mutate,useragent) 
	Output:数据输出(常用插件:Elasticsearch)

165. ELK应用之Logstash 数据处理
165. ELK应用之Logstash 数据处理
165. ELK应用之Logstash 数据处理

input 
stdin: 标准输入
log:   从文件中读取
beats:从filebeat中读取
http:  从http协议中读取
redis:从redis中读取数据
kafka:从kafka中获取数据
filter
grok:解析json
date:转时间 @timestamp
useragent:提取用户来源设备
geoip:获取地理位置
mutate
   remove _filed :删除一个字段
   add_filed:与打标签类似/添加一个字段
   convert:转换数据类型

3. Logstash input 插件

input插件用于指定输入源,一个pipeline可以有多个input插件,我们主要围绕下面几个input插件进行介绍

  • stdin

  • file

  • beat

  • kafka

4. logstash 安装

rpm -ivh logstash-7.4.0.rpm 
vim /etc/logstash/jvm.options
...
-Xms512m
-Xmx512m
...

实战1:从标准输入读取数据,从标准输出中输出内容

[aaa@qq.com conf.d]# cat input_stdin_output_console.conf 
input {
	stdin {
		type => stdin
		tags => "tags_stdin"
	}
	
}

output {
	stdout {
		codec => "rubydebug"
	}
}

#启动logstash
/usr/share/logstash/bin/logstash -f input_stdin_output_console.conf 

#检测
123
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
      "@version" => "1",
          "host" => "logstash-node1",
          "type" => "stdin",
    "@timestamp" => 2020-01-15T12:16:14.389Z,
          "tags" => [
        [0] "tags_stdin"
    ],
       "message" => "123"
}

实战2:从file文件中读取数据,然后输入至标准输入

[aaa@qq.com conf.d]# cat input_file_output_console.conf 
input {
    file {
        path => "/var/log/cheng.log"
        type => syslog
        exclude => "*.gz"       #不想监听的文件规则,基于glob匹配语法
        start_position => "beginning"   #第一次丛头开始读取文件 beginning or end
        stat_interval => "3"    #定时检查文件是否更新,默认1s
    }
}

output {
    stdout {
        codec => rubydebug
    }
}

#启动
/usr/share/logstash/bin/logstash -f input_file_output_console.conf 

#测试
echo "123" >>/var/log/cheng.log 
{
          "host" => "logstash-node1",
    "@timestamp" => 2020-01-15T12:22:41.016Z,
       "message" => "123",
          "path" => "/var/log/cheng.log",
          "type" => "syslog",
      "@version" => "1"
}

5.Logstash Filter 插件

数据从源传输到存储的过程中,Logstash 的fileter过滤器能够解析各个事件,识别已命名的字段结构,并将它们转换成通用格式,以便更轻松,更快速的分析个实现商业价值

  • 利用Grok从非结构化数据中派生出结构
  • 利用geoip从ip地址分析出地理坐标
  • 利用useragent从请求中分析操作系统,设备类型

5. Grok插件

5.1 grok是如何出现?

# 我们希望将如下非架构化的数据解析成json结构化数据格式
120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] "GET / HTTP/1.1" 302 154 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1)  Chrome/79.0.3945.88 Safari/537.36"
#需要使用非常复杂的正则表达式

5.2 grok如何解决该问题呢?grok其实是带有名字的正则表达式集合。grok内置了很多pattern可以直接使用

grok语法生成器

%{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response}  (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent} %{QS:xforwardedfor} %{IPORHOST:host} %{BASE10NUM:request_duration}

5.3 grok语法示意图

165. ELK应用之Logstash 数据处理

5.4 grok 实战1

[aaa@qq.com /etc/logstash/conf.d]# cat input_filed_console.conf 
input {
	http {
		port => 7474
	}
}

filter {
#将nginx日志格式转化为json格式
	grok {
		match => { "message" => "%{COMBINEDAPACHELOG}" }
	      }
       }
}

output {
	stdout {
		codec => rubydebug
	}
}

/usr/share/logstash/bin/logstash -f input_filed_console.conf -r

165. ELK应用之Logstash 数据处理

5.5 运行结果

{
       "referrer" => "\"-\"",
           "verb" => "GET",
          "ident" => "-",
        "request" => "/",
       "@version" => "1",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1)  Chrome/79.0.3945.88 Safari/537.36\"",
    "httpversion" => "1.1",
       "response" => "302",
       "clientip" => "120.27.74.166",
        "message" => "120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] \"GET / HTTP/1.1\" 302 154 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1)  Chrome/79.0.3945.88 Safari/537.36\"",
           "host" => "10.0.0.1",
        "headers" => {
              "http_host" => "10.0.0.151:7474",
           "request_path" => "/",
         "content_length" => "160",
         "request_method" => "GET",
        "http_user_agent" => "insomnia/7.0.6",
            "http_accept" => "*/*",
           "http_version" => "HTTP/1.1"
    },
     "@timestamp" => 2020-01-15T12:58:49.253Z,
          "bytes" => "154",
           "auth" => "-",
      "timestamp" => "30/Dec/2018:11:59:18 +0800"
}

6. geoip 插件

geoip插件:根据ip地址提供的对应地域信息,比如经纬度,城市名等,方便进行地理数据分析

6.1 配置文件

[root@logstash-node1 /etc/logstash/conf.d]# cat input_filed_console.conf 
input {
	http {
		port => 7474
	}
}

filter {
	grok {
		match => { "message" => "%{COMBINEDAPACHELOG}" }
	}

	geoip {
		source => "clientip"
	}

}
output {
	stdout {
		codec => rubydebug
	}
}

/usr/share/logstash/bin/logstash -f  input_filed_console.conf  -r

165. ELK应用之Logstash 数据处理

6.2 运行结果

{
       "@version" => "1",
           "verb" => "GET",
     "@timestamp" => 2020-01-15T13:23:21.685Z,
          "ident" => "-",
      "timestamp" => "30/Dec/2018:11:59:18 +0800",
        "headers" => {
         "content_length" => "160",
            "http_accept" => "*/*",
           "http_version" => "HTTP/1.1",
              "http_host" => "10.0.0.151:7474",
           "request_path" => "/",
         "request_method" => "GET",
        "http_user_agent" => "insomnia/7.0.6"
    },
    "httpversion" => "1.1",
        "message" => "120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] \"GET / HTTP/1.1\" 302 154 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1)  Chrome/79.0.3945.88 Safari/537.36\"",
           "auth" => "-",
           "host" => "10.0.0.1",
        "request" => "/",
       "referrer" => "\"-\"",
          "bytes" => "154",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1)  Chrome/79.0.3945.88 Safari/537.36\"",
          "geoip" => {
              "location" => {
            "lat" => 30.294,
            "lon" => 120.1619
        },
              "timezone" => "Asia/Shanghai",
          "country_name" => "China",
         "country_code3" => "CN",
                    "ip" => "120.27.74.166",
           "region_name" => "Zhejiang",
           "region_code" => "ZJ",
              "latitude" => 30.294,
         "country_code2" => "CN",
             "longitude" => 120.1619,
             "city_name" => "Hangzhou",
        "continent_code" => "AS"
    },
       "response" => "302",
       "clientip" => "120.27.74.166"
}

7. Date 插件

date插件: 将日期字符串解析为日志类型,然后替换@timestamp 字段或指定的其他字段

  • match 类型为数组,用于指定日期匹配的格式,可以以此指定多种日期格式
  • target 类型为字符串,用于指定赋值的字段名,默认是 @timestamp
  • timezone 类型为字符串,用于指定时区域

7.1 配置文件

[root@logstash-node1 /etc/logstash/conf.d]# cat input_filed_console.conf 
input {
	http {
		port => 7474
	}
}

filter {
	grok {
		match => { "message" => "%{COMBINEDAPACHELOG}" }
	}

	geoip {
		source => "clientip"
	}

		
	#30/Dec/2019:11:59:18 +0800
	date {
		match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
		target => "@timestamp"
		timezone => "Asia/Shanghai"
	}

}



output {
	stdout {
		codec => rubydebug
	}
}

165. ELK应用之Logstash 数据处理

7.2 运行结果

{
       "@version" => "1",
           "verb" => "GET",
     "@timestamp" => 2020-01-15T13:23:21.685Z,
          "ident" => "-",
      "timestamp" => "30/Dec/2018:11:59:18 +0800",
        "headers" => {
         "content_length" => "160",
            "http_accept" => "*/*",
           "http_version" => "HTTP/1.1",
              "http_host" => "10.0.0.151:7474",
           "request_path" => "/",
         "request_method" => "GET",
        "http_user_agent" => "insomnia/7.0.6"
    },
    "httpversion" => "1.1",
        "message" => "120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] \"GET / HTTP/1.1\" 302 154 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1)  Chrome/79.0.3945.88 Safari/537.36\"",
           "auth" => "-",
           "host" => "10.0.0.1",
        "request" => "/",
       "referrer" => "\"-\"",
          "bytes" => "154",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1)  Chrome/79.0.3945.88 Safari/537.36\"",
          "geoip" => {
              "location" => {
            "lat" => 30.294,
            "lon" => 120.1619
        },
              "timezone" => "Asia/Shanghai",
          "country_name" => "China",
         "country_code3" => "CN",
                    "ip" => "120.27.74.166",
           "region_name" => "Zhejiang",
           "region_code" => "ZJ",
              "latitude" => 30.294,
         "country_code2" => "CN",
             "longitude" => 120.1619,
             "city_name" => "Hangzhou",
        "continent_code" => "AS"
    },
       "response" => "302",
       "clientip" => "120.27.74.166"
}

8. useragent插件

useragent插件:根据请求中的user-agnet字段,解析出浏览器设备,操作系统等信息

8.1 useragent示例:配置文件

[root@logstash-node1 /etc/logstash/conf.d]# cat input_filed_console.conf 
input {
	http {
		port => 7474
	}
}

filter {
	grok {
		match => { "message" => "%{COMBINEDAPACHELOG}" }
	}

	geoip {
		source => "clientip"
	}

		
	30/Dec/2019:11:59:18 +0800
	date {
		match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
		target => "@timestamp"
		timezone => "Asia/Shanghai"
	}

	useragent {
		source => "agent" #转换为agent
		target => "agent" #覆盖agent
	}


}



output {
	stdout {
		codec => rubydebug
	}
}

8.2 运行结果

165. ELK应用之Logstash 数据处理

{
    "httpversion" => "1.1",
        "request" => "/",
          "bytes" => "154",
        "headers" => {
            "http_accept" => "*/*",
           "request_path" => "/",
        "http_user_agent" => "insomnia/7.0.6",
         "content_length" => "160",
         "request_method" => "GET",
           "http_version" => "HTTP/1.1",
              "http_host" => "10.0.0.151:7474"
    },
       "referrer" => "\"-\"",
        "message" => "120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] \"GET / HTTP/1.1\" 302 154 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1)  Chrome/79.0.3945.88 Safari/537.36\"",
           "host" => "10.0.0.1",
       "clientip" => "120.27.74.166",
     "@timestamp" => 2018-12-30T03:59:18.000Z,
           "verb" => "GET",
          "geoip" => {
        "continent_code" => "AS",
         "country_code3" => "CN",
          "country_name" => "China",
           "region_name" => "Zhejiang",
             "longitude" => 120.1619,
              "latitude" => 30.294,
         "country_code2" => "CN",
             "city_name" => "Hangzhou",
           "region_code" => "ZJ",
              "timezone" => "Asia/Shanghai",
                    "ip" => "120.27.74.166",
              "location" => {
            "lat" => 30.294,
            "lon" => 120.1619
        }
    },
          "agent" => {
           "minor" => "0",
              "os" => "Mac OS X",
           "patch" => "3945",
        "os_major" => "10",
           "major" => "79",
         "os_name" => "Mac OS X",
            "name" => "Chrome",
           "build" => "",
        "os_minor" => "14",
          "device" => "Other"
    },
       "@version" => "1",
       "response" => "302",
      "timestamp" => "30/Dec/2018:11:59:18 +0800",
          "ident" => "-",
           "auth" => "-"
}

9. mutate 插件

mutate 主要对字段进行类型转换,删除,替换,更新等操作

  • remove_fileld 删除字段
  • split 字符串切割
  • add_field 添加字段
  • convert 类型转换
  • gsub 字符串替换
  • rename 字段重命名
mutate 删除无用字段,比如:headers,messages,agent

mutate 中的split字符切割,指定|作为分隔符

mutate 中的add_fileld,可以将分割后的数据创建出新的字段名称,便于以后的统计和分析

mutate 中的convert类型转换,支持转换integer,float,string 和bollean

9.1 配置文件

[root@logstash-01 conf.d]# cat input_grok_output_conaole.conf 

input {
	http {
		port => 7474
	}
}

filter {
#	grok {
#		match => { "message" => "%{COMBINEDAPACHELOG}" }
#	}
#
#	geoip {
#		source => "clientip"
#	}
#
#		
#	#30/Dec/2019:11:59:18 +0800
#	date {
#		match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
#		target => "@timestamp"
#		timezone => "Asia/Shanghai"
#	}
#
#	useragent {
#		source => "agent"
#		target => "agent"
#	}
	
#	mutate {
#		remove_field => [ "message","headers","timestamp" ]
#	}

	mutate {
		split => { "message" =>  "|" }
	}

	mutate {
		add_field => {
			"userID" => "%{[message][0]}"
			"Action" => "%{[message][1]}"
			"Date"   => "%{[message][2]}"
		}
		remove_field => ["message","headers"]

		convert => {
            		"userID" => "integer"
            		"Action" => "string"
            		"Date"   => "string"
        	}		
	}
}

output {
	stdout {
		codec => rubydebug
	}

	elasticsearch {
		hosts => ["10.0.0.161:9200","10.0.0.162:9200","10.0.0.163:9200"]
		index => "app-%{+YYYY.MM.dd}"     #索引名称
		template_overwrite => true
	}
}

9.2 运行结果

165. ELK应用之Logstash 数据处理
165. ELK应用之Logstash 数据处理

10. logstash output 插件

负责将Logsh Event输出,常见爱的插件如下

  • stdout
  • file
  • elasticsearch

10.1 输出到linux 终端,便于调试

outout{
  stdout{
    codec => rubydebug
  }
}

10.2 输出到文件,实现将分散在多地的文件统一到一处的需求,比如将所有web机器的web日志收集到一个文件中,从而方便查阅信息

output {
file {
	path => "/var/log/web.log"
}

}

10.3 输出到elasticsearch,是最常用的插件,基于http协议实现。

output {
elasticsearch {
		hosts => ["10.0.0.161:9200","10.0.0.162:9200","10.0.0.163:9200"]
		index => "app-%{+YYYY.MM.dd}"     #索引名称
		template_overwrite => true
	}
}

11. logstash 实战分析nginx 日志 ***

分析nginx日志

66.249.73.135 - - [20/May/2015:21:05:11 +0000] "GET /blog/tags/xsendevent HTTP/1.1" 200 10049 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"

11.1 filebeat 配置文件如下

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  tags: ["nginx-access"]

- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  tags: ["nginx-error"]

output.logstash:
  hosts: ["10.0.0.151:5044"]
  #启动logstash
  systemctl restart filebeat

11.2 配置文件如下

input {
	beats {
		port => 5044
	}
}

filter {

if "nginx-access" in [tags][0] {
	grok {
		match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:useragent}" }
	}

	date {
		match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
		target => "@timestamp"
		timezone => "Asia/Shanghai"
	}

	geoip {
		source => "clientip"
	}

	useragent {
		source => "useragent"
		target => "useragent"
	}

	mutate {
		rename => ["%{[host][name]}" , "hostname" ]
		convert => [ "bytes", "integer" ]
		remove_field => [ "message", "agent" , "input","ecs" ]
		add_field => { "target_index" => "logstash-nginx-access-%{+YYYY.MM.dd}" }
	}
}  else if "nginx-error" in [tags][0] {
	mutate {
		add_field => { "target_index" => "logstash-nginx-error-%{+YYYY.MM.dd}" }
	}
   }

}

output {
	elasticsearch {
		hosts => ["10.0.0.161:9200","10.0.0.162:9200","10.0.0.163:9200"]
		index => "%{[target_index]}"
	}
}

#启动logstash
systemctl restart logstash

11.3 运行结果

165. ELK应用之Logstash 数据处理
165. ELK应用之Logstash 数据处理

12. Mysql 慢日志收集介绍

12.1 什么是mysql 慢查询日志?

当SQL 语句执行时间超过所设定的阈值时,便会记录到指定的日志文件中,所记录内容称之为慢查询日志

12.2 为什么要收集Mysql 慢查询日志?

数据库运行期间,可能会存在SQL语句查询过慢,那我们如何快速定位,分析哪些SQL 语句需要优化处理,又是那些SQL语句给业务系统造成影响呢?

当我们进行统一的收集分析,SQL语句执行的时间,对应语句的具体写法,一目了然

12.3 如何收集mysql慢查询日志?

  • 安装Mysql
  • 开启Mysql 慢查询日志记录
  • 使用filebeat收集本地慢查询日志路径

12.4 filebeat 配置文件

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/mariadb/slow.log
  exclude_lines: ['^\# Time']
  multiline.pattern: '^\#User'
  multiline.negate: true
  multiline.match: after
  multiline.max_lines: 10000
  tags: ["mysql-slow"]


output.logstash:
  hosts: ["10.0.0.151:5044"]

12.5 logstash 配置文件

input {
	beats {
		port => 5044
	}
}


filter {

	mutate {
		gsub => ["message","\n"," "]
	}
	grok {
	
		match => {
		"message" => "(?m)^# aaa@qq.com: %{USER:User}\[%{USER-2:User}\] @ (?:(?<Clienthost>\S*) )?\[(?:%{IP:Client_IP})?\] # Thread_id: %{NUMBER:Thread_id:integer}\s+ Schema: (?:(?<DBname>\S*) )\s+QC_hit: (?:(?<QC_hit>\S*) )# Query_time: %{NUMBER:Query_Time}\s+ Lock_time: %{NUMBER:Lock_Time}\s+ Rows_sent: %{NUMBER:Rows_Sent:integer}\s+Rows_examined: %{NUMBER:Rows_Examined:integer} SET timestamp=%{NUMBER:timestamp}; \s*(?<Query>(?<Action>\w+)\s+.*)"
		}
	}

    date {
        match => ["timestamp","UNIX", "YYYY-MM-dd HH:mm:ss"]
        target => "@timestamp"
        timezone => "Asia/Shanghai"
    }
    mutate {
        remove_field => ["message","input","timestamp","agent","ecs","log"]
        convert => ["Lock_Time","float"]
        convert => ["Query_Time","float"]
        add_field => { "target_index" => "logstash-mysql-slow-%{+YYYY.MM.dd}" }
    }
}

output {
	elasticsearch {
		hosts => ["10.0.0.161:9200"]
		index => "%{[target_index]}"
	}
	stdout {
		codec => "rubydebug"
	}
}

12.6 运行结果

165. ELK应用之Logstash 数据处理
165. ELK应用之Logstash 数据处理

相关标签: ELK Stack