165. ELK应用之Logstash 数据处理
程序员文章站
2024-01-20 16:23:04
...
1. Logstash
Logstash 是开源的数据处理管道,能够同时出来从多个源采集数据,转换数据,然后输出数据
2. Logstash架构介绍
Logstash 的基础架构类型pipeline流水线
input: 数据采集(常用插件:stdin,file,kafka,beat,http)
Fileter:数据解析/转换(常用插件:grok,date,geoip,mutate,useragent)
Output:数据输出(常用插件:Elasticsearch)
input
stdin: 标准输入
log: 从文件中读取
beats:从filebeat中读取
http: 从http协议中读取
redis:从redis中读取数据
kafka:从kafka中获取数据
filter
grok:解析json
date:转时间 @timestamp
useragent:提取用户来源设备
geoip:获取地理位置
mutate
remove _filed :删除一个字段
add_filed:与打标签类似/添加一个字段
convert:转换数据类型
3. Logstash input 插件
input插件用于指定输入源,一个pipeline可以有多个input插件,我们主要围绕下面几个input插件进行介绍
-
stdin
-
file
-
beat
-
kafka
4. logstash 安装
rpm -ivh logstash-7.4.0.rpm
vim /etc/logstash/jvm.options
...
-Xms512m
-Xmx512m
...
实战1:从标准输入读取数据,从标准输出中输出内容
[aaa@qq.com conf.d]# cat input_stdin_output_console.conf
input {
stdin {
type => stdin
tags => "tags_stdin"
}
}
output {
stdout {
codec => "rubydebug"
}
}
#启动logstash
/usr/share/logstash/bin/logstash -f input_stdin_output_console.conf
#检测
123
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
"@version" => "1",
"host" => "logstash-node1",
"type" => "stdin",
"@timestamp" => 2020-01-15T12:16:14.389Z,
"tags" => [
[0] "tags_stdin"
],
"message" => "123"
}
实战2:从file文件中读取数据,然后输入至标准输入
[aaa@qq.com conf.d]# cat input_file_output_console.conf
input {
file {
path => "/var/log/cheng.log"
type => syslog
exclude => "*.gz" #不想监听的文件规则,基于glob匹配语法
start_position => "beginning" #第一次丛头开始读取文件 beginning or end
stat_interval => "3" #定时检查文件是否更新,默认1s
}
}
output {
stdout {
codec => rubydebug
}
}
#启动
/usr/share/logstash/bin/logstash -f input_file_output_console.conf
#测试
echo "123" >>/var/log/cheng.log
{
"host" => "logstash-node1",
"@timestamp" => 2020-01-15T12:22:41.016Z,
"message" => "123",
"path" => "/var/log/cheng.log",
"type" => "syslog",
"@version" => "1"
}
5.Logstash Filter 插件
数据从源传输到存储的过程中,Logstash 的fileter过滤器能够解析各个事件,识别已命名的字段结构,并将它们转换成通用格式,以便更轻松,更快速的分析个实现商业价值
- 利用Grok从非结构化数据中派生出结构
- 利用geoip从ip地址分析出地理坐标
- 利用useragent从请求中分析操作系统,设备类型
- …
5. Grok插件
5.1 grok是如何出现?
# 我们希望将如下非架构化的数据解析成json结构化数据格式
120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] "GET / HTTP/1.1" 302 154 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) Chrome/79.0.3945.88 Safari/537.36"
#需要使用非常复杂的正则表达式
5.2 grok如何解决该问题呢?grok其实是带有名字的正则表达式集合。grok内置了很多pattern可以直接使用
%{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent} %{QS:xforwardedfor} %{IPORHOST:host} %{BASE10NUM:request_duration}
5.3 grok语法示意图
5.4 grok 实战1
[aaa@qq.com /etc/logstash/conf.d]# cat input_filed_console.conf
input {
http {
port => 7474
}
}
filter {
#将nginx日志格式转化为json格式
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
}
output {
stdout {
codec => rubydebug
}
}
/usr/share/logstash/bin/logstash -f input_filed_console.conf -r
5.5 运行结果
{
"referrer" => "\"-\"",
"verb" => "GET",
"ident" => "-",
"request" => "/",
"@version" => "1",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) Chrome/79.0.3945.88 Safari/537.36\"",
"httpversion" => "1.1",
"response" => "302",
"clientip" => "120.27.74.166",
"message" => "120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] \"GET / HTTP/1.1\" 302 154 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) Chrome/79.0.3945.88 Safari/537.36\"",
"host" => "10.0.0.1",
"headers" => {
"http_host" => "10.0.0.151:7474",
"request_path" => "/",
"content_length" => "160",
"request_method" => "GET",
"http_user_agent" => "insomnia/7.0.6",
"http_accept" => "*/*",
"http_version" => "HTTP/1.1"
},
"@timestamp" => 2020-01-15T12:58:49.253Z,
"bytes" => "154",
"auth" => "-",
"timestamp" => "30/Dec/2018:11:59:18 +0800"
}
6. geoip 插件
geoip插件:根据ip地址提供的对应地域信息,比如经纬度,城市名等,方便进行地理数据分析
6.1 配置文件
[root@logstash-node1 /etc/logstash/conf.d]# cat input_filed_console.conf
input {
http {
port => 7474
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
}
}
output {
stdout {
codec => rubydebug
}
}
/usr/share/logstash/bin/logstash -f input_filed_console.conf -r
6.2 运行结果
{
"@version" => "1",
"verb" => "GET",
"@timestamp" => 2020-01-15T13:23:21.685Z,
"ident" => "-",
"timestamp" => "30/Dec/2018:11:59:18 +0800",
"headers" => {
"content_length" => "160",
"http_accept" => "*/*",
"http_version" => "HTTP/1.1",
"http_host" => "10.0.0.151:7474",
"request_path" => "/",
"request_method" => "GET",
"http_user_agent" => "insomnia/7.0.6"
},
"httpversion" => "1.1",
"message" => "120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] \"GET / HTTP/1.1\" 302 154 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) Chrome/79.0.3945.88 Safari/537.36\"",
"auth" => "-",
"host" => "10.0.0.1",
"request" => "/",
"referrer" => "\"-\"",
"bytes" => "154",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) Chrome/79.0.3945.88 Safari/537.36\"",
"geoip" => {
"location" => {
"lat" => 30.294,
"lon" => 120.1619
},
"timezone" => "Asia/Shanghai",
"country_name" => "China",
"country_code3" => "CN",
"ip" => "120.27.74.166",
"region_name" => "Zhejiang",
"region_code" => "ZJ",
"latitude" => 30.294,
"country_code2" => "CN",
"longitude" => 120.1619,
"city_name" => "Hangzhou",
"continent_code" => "AS"
},
"response" => "302",
"clientip" => "120.27.74.166"
}
7. Date 插件
date插件: 将日期字符串解析为日志类型,然后替换@timestamp 字段或指定的其他字段
- match 类型为数组,用于指定日期匹配的格式,可以以此指定多种日期格式
- target 类型为字符串,用于指定赋值的字段名,默认是 @timestamp
- timezone 类型为字符串,用于指定时区域
7.1 配置文件
[root@logstash-node1 /etc/logstash/conf.d]# cat input_filed_console.conf
input {
http {
port => 7474
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
}
#30/Dec/2019:11:59:18 +0800
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
}
output {
stdout {
codec => rubydebug
}
}
7.2 运行结果
{
"@version" => "1",
"verb" => "GET",
"@timestamp" => 2020-01-15T13:23:21.685Z,
"ident" => "-",
"timestamp" => "30/Dec/2018:11:59:18 +0800",
"headers" => {
"content_length" => "160",
"http_accept" => "*/*",
"http_version" => "HTTP/1.1",
"http_host" => "10.0.0.151:7474",
"request_path" => "/",
"request_method" => "GET",
"http_user_agent" => "insomnia/7.0.6"
},
"httpversion" => "1.1",
"message" => "120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] \"GET / HTTP/1.1\" 302 154 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) Chrome/79.0.3945.88 Safari/537.36\"",
"auth" => "-",
"host" => "10.0.0.1",
"request" => "/",
"referrer" => "\"-\"",
"bytes" => "154",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) Chrome/79.0.3945.88 Safari/537.36\"",
"geoip" => {
"location" => {
"lat" => 30.294,
"lon" => 120.1619
},
"timezone" => "Asia/Shanghai",
"country_name" => "China",
"country_code3" => "CN",
"ip" => "120.27.74.166",
"region_name" => "Zhejiang",
"region_code" => "ZJ",
"latitude" => 30.294,
"country_code2" => "CN",
"longitude" => 120.1619,
"city_name" => "Hangzhou",
"continent_code" => "AS"
},
"response" => "302",
"clientip" => "120.27.74.166"
}
8. useragent插件
useragent插件:根据请求中的user-agnet字段,解析出浏览器设备,操作系统等信息
8.1 useragent示例:配置文件
[root@logstash-node1 /etc/logstash/conf.d]# cat input_filed_console.conf
input {
http {
port => 7474
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
}
30/Dec/2019:11:59:18 +0800
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
useragent {
source => "agent" #转换为agent
target => "agent" #覆盖agent
}
}
output {
stdout {
codec => rubydebug
}
}
8.2 运行结果
{
"httpversion" => "1.1",
"request" => "/",
"bytes" => "154",
"headers" => {
"http_accept" => "*/*",
"request_path" => "/",
"http_user_agent" => "insomnia/7.0.6",
"content_length" => "160",
"request_method" => "GET",
"http_version" => "HTTP/1.1",
"http_host" => "10.0.0.151:7474"
},
"referrer" => "\"-\"",
"message" => "120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] \"GET / HTTP/1.1\" 302 154 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) Chrome/79.0.3945.88 Safari/537.36\"",
"host" => "10.0.0.1",
"clientip" => "120.27.74.166",
"@timestamp" => 2018-12-30T03:59:18.000Z,
"verb" => "GET",
"geoip" => {
"continent_code" => "AS",
"country_code3" => "CN",
"country_name" => "China",
"region_name" => "Zhejiang",
"longitude" => 120.1619,
"latitude" => 30.294,
"country_code2" => "CN",
"city_name" => "Hangzhou",
"region_code" => "ZJ",
"timezone" => "Asia/Shanghai",
"ip" => "120.27.74.166",
"location" => {
"lat" => 30.294,
"lon" => 120.1619
}
},
"agent" => {
"minor" => "0",
"os" => "Mac OS X",
"patch" => "3945",
"os_major" => "10",
"major" => "79",
"os_name" => "Mac OS X",
"name" => "Chrome",
"build" => "",
"os_minor" => "14",
"device" => "Other"
},
"@version" => "1",
"response" => "302",
"timestamp" => "30/Dec/2018:11:59:18 +0800",
"ident" => "-",
"auth" => "-"
}
9. mutate 插件
mutate 主要对字段进行类型转换,删除,替换,更新等操作
- remove_fileld 删除字段
- split 字符串切割
- add_field 添加字段
- convert 类型转换
- gsub 字符串替换
- rename 字段重命名
mutate 删除无用字段,比如:headers,messages,agent
mutate 中的split字符切割,指定|作为分隔符
mutate 中的add_fileld,可以将分割后的数据创建出新的字段名称,便于以后的统计和分析
mutate 中的convert类型转换,支持转换integer,float,string 和bollean
9.1 配置文件
[root@logstash-01 conf.d]# cat input_grok_output_conaole.conf
input {
http {
port => 7474
}
}
filter {
# grok {
# match => { "message" => "%{COMBINEDAPACHELOG}" }
# }
#
# geoip {
# source => "clientip"
# }
#
#
# #30/Dec/2019:11:59:18 +0800
# date {
# match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
# target => "@timestamp"
# timezone => "Asia/Shanghai"
# }
#
# useragent {
# source => "agent"
# target => "agent"
# }
# mutate {
# remove_field => [ "message","headers","timestamp" ]
# }
mutate {
split => { "message" => "|" }
}
mutate {
add_field => {
"userID" => "%{[message][0]}"
"Action" => "%{[message][1]}"
"Date" => "%{[message][2]}"
}
remove_field => ["message","headers"]
convert => {
"userID" => "integer"
"Action" => "string"
"Date" => "string"
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["10.0.0.161:9200","10.0.0.162:9200","10.0.0.163:9200"]
index => "app-%{+YYYY.MM.dd}" #索引名称
template_overwrite => true
}
}
9.2 运行结果
10. logstash output 插件
负责将Logsh Event输出,常见爱的插件如下
- stdout
- file
- elasticsearch
10.1 输出到linux 终端,便于调试
outout{
stdout{
codec => rubydebug
}
}
10.2 输出到文件,实现将分散在多地的文件统一到一处的需求,比如将所有web机器的web日志收集到一个文件中,从而方便查阅信息
output {
file {
path => "/var/log/web.log"
}
}
10.3 输出到elasticsearch,是最常用的插件,基于http协议实现。
output {
elasticsearch {
hosts => ["10.0.0.161:9200","10.0.0.162:9200","10.0.0.163:9200"]
index => "app-%{+YYYY.MM.dd}" #索引名称
template_overwrite => true
}
}
11. logstash 实战分析nginx 日志 ***
分析nginx日志
66.249.73.135 - - [20/May/2015:21:05:11 +0000] "GET /blog/tags/xsendevent HTTP/1.1" 200 10049 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
11.1 filebeat 配置文件如下
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
tags: ["nginx-access"]
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
tags: ["nginx-error"]
output.logstash:
hosts: ["10.0.0.151:5044"]
#启动logstash
systemctl restart filebeat
11.2 配置文件如下
input {
beats {
port => 5044
}
}
filter {
if "nginx-access" in [tags][0] {
grok {
match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:useragent}" }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
geoip {
source => "clientip"
}
useragent {
source => "useragent"
target => "useragent"
}
mutate {
rename => ["%{[host][name]}" , "hostname" ]
convert => [ "bytes", "integer" ]
remove_field => [ "message", "agent" , "input","ecs" ]
add_field => { "target_index" => "logstash-nginx-access-%{+YYYY.MM.dd}" }
}
} else if "nginx-error" in [tags][0] {
mutate {
add_field => { "target_index" => "logstash-nginx-error-%{+YYYY.MM.dd}" }
}
}
}
output {
elasticsearch {
hosts => ["10.0.0.161:9200","10.0.0.162:9200","10.0.0.163:9200"]
index => "%{[target_index]}"
}
}
#启动logstash
systemctl restart logstash
11.3 运行结果
12. Mysql 慢日志收集介绍
12.1 什么是mysql 慢查询日志?
当SQL 语句执行时间超过所设定的阈值时,便会记录到指定的日志文件中,所记录内容称之为慢查询日志
12.2 为什么要收集Mysql 慢查询日志?
数据库运行期间,可能会存在SQL语句查询过慢,那我们如何快速定位,分析哪些SQL 语句需要优化处理,又是那些SQL语句给业务系统造成影响呢?
当我们进行统一的收集分析,SQL语句执行的时间,对应语句的具体写法,一目了然
12.3 如何收集mysql慢查询日志?
- 安装Mysql
- 开启Mysql 慢查询日志记录
- 使用filebeat收集本地慢查询日志路径
12.4 filebeat 配置文件
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/mariadb/slow.log
exclude_lines: ['^\# Time']
multiline.pattern: '^\#User'
multiline.negate: true
multiline.match: after
multiline.max_lines: 10000
tags: ["mysql-slow"]
output.logstash:
hosts: ["10.0.0.151:5044"]
12.5 logstash 配置文件
input {
beats {
port => 5044
}
}
filter {
mutate {
gsub => ["message","\n"," "]
}
grok {
match => {
"message" => "(?m)^# aaa@qq.com: %{USER:User}\[%{USER-2:User}\] @ (?:(?<Clienthost>\S*) )?\[(?:%{IP:Client_IP})?\] # Thread_id: %{NUMBER:Thread_id:integer}\s+ Schema: (?:(?<DBname>\S*) )\s+QC_hit: (?:(?<QC_hit>\S*) )# Query_time: %{NUMBER:Query_Time}\s+ Lock_time: %{NUMBER:Lock_Time}\s+ Rows_sent: %{NUMBER:Rows_Sent:integer}\s+Rows_examined: %{NUMBER:Rows_Examined:integer} SET timestamp=%{NUMBER:timestamp}; \s*(?<Query>(?<Action>\w+)\s+.*)"
}
}
date {
match => ["timestamp","UNIX", "YYYY-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
remove_field => ["message","input","timestamp","agent","ecs","log"]
convert => ["Lock_Time","float"]
convert => ["Query_Time","float"]
add_field => { "target_index" => "logstash-mysql-slow-%{+YYYY.MM.dd}" }
}
}
output {
elasticsearch {
hosts => ["10.0.0.161:9200"]
index => "%{[target_index]}"
}
stdout {
codec => "rubydebug"
}
}
12.6 运行结果
上一篇: Ionic3项目实践记录
下一篇: 10. Go 语言流程控制:for 循环