使用Logstash收集Nginx日志
尽量少使用grok进行过滤可以减轻logstash的负担,提高处理效率。
首先将Nginx日志格式作调整
log_format json '{"@timestamp":"$time_iso8601",'
'"@source":"$server_addr",'
'"@nginx_fields":{'
'"client":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":"$request_time",'
'"upstreamtime":"$upstream_response_time",'
'"upstreamaddr":"$upstream_addr",'
'"request_method": "$request_method",'
'"domain":"$host",'
'"url":"$uri",'
'"http_user_agent": "$http_user_agent",'
'"status":$status,'
'"x_forwarded_for":"$http_x_forwarded_for"}}';
access_log /data/app_data/nginx/logs/access.log json;
这里需要特别注意的是,要分析的日志字段类型是字符串还是数字。如果是日志格式是JSON格式的,那么数字类型的字段显示成为 - ,JSON格式将出错。
例如在JSON中如果responsetime格式是字符串,那么"-"可以解析,如果是数字,JSON将不能正确解析。
"responsetime": "-",
"responsetime": -,
如果JSON格式出错,那么Logstash解析JSON就会出错,相应的JSON字段将不会再Kibana显示。
这里要特别注意索引字段的类型,Kibana会默认根据首次接收到的数据创建索引类型。如果后期更好了数据来源的类型,需要更改Elasticsearch的mapping。
例如原来Nginx的 reponsetime设置的是数字类型,但是访问一些错误URL时,reponsetime字段的值将会显示 - ,这时JSON格式将出错,所以决定把Nginx日志JSON格式的所有数字类型都用""括起来。但是由于之前一直是数字类型,所以更改JSON格式后,有些Nginx日志无法在Kibana页面进行搜索。这时就需要更改mapping。
nginx access log相关的logstash配置如下:
file {
type => "nginx_access"
path => ["/data/app_data/nginx/logs/*.log"]
exclude => ["*.gz","error.log"]
sincedb_path => "/dev/null"
codec => "json"
}
解析error log
Nginx错误日志经常有一行信息显示成为多行的情况,例如PHP程序的一些报错会分成多行显示,这样Logstash默认会把一条Nginx错误日志当成多行处理。这种情况下就需要使用logstash的multiline对多行进行合并。
如果不清楚应该怎么样去合并多行,可以使用logstash的 rubydebug输出日志看看logstash是怎么输出的,然后根据相应的规则匹配进行合并。
multiline可以在codec和filter中使用,合并后的日志会比同一时间的其他日志慢点显示。
例如这条错误日志:
2015/04/17 18:16:43 [error] 32320#0: *7664344 FastCGI sent in stderr: "PHP message: PHP Fatal error: Uncaught exception 'Yaf_Exception' with message 'There is no section 'game' in 'xxxxxxxxxxxxxxxxxxxxxxx'' in router.php:13
Stack trace:
#0 router.php(13): Yaf_Application->__construct('/data/...', 'xx')
#1 {main}
Next exception 'Yaf_Exception_StartupError' with message 'Initialization of application config failed' in router.php:13
Stack trace:
#0 router.php(0): Yaf_Application::__construct()
#1 {main}
thrown in router.php on line 13" while reading response header from upstream, client: 180.150.179.184, server: xxxx.com, request: "GET /router.php HTTP/1.1", upstream: "fastcgi://127.0.0.1:9000", host: "xxx.com"
这个是PHP使用Yaf框架产生的错误日志,Nginx分成了多行显示。这里就需要对多行日志进行匹配
匹配 #,换行符,Next,Stack和空白开头的行
file {
type => "nginx_error"
path => "/data/app_data/nginx/logs/error.log"
exclude => ["*.gz"]
sincedb_path => "/dev/null"
}
filter {
if [type] == "nginx_error" {
multiline {
pattern => "^(Stack trace:|#|Next|\\n|$|\s)"
what => "previous"
}
}
}
可以使用logstash的geoip过滤处理Nginx的来源IP然后通过Kibana在页面进行地图展示
if [type] == "nginx_access" {
if [@nginx_fields][x_forwarded_for] != "-" {
geoip {
source => "[@nginx_fields][x_forwarded_for]"
target => "geoip"
database => "/data/app_platform/logstash/vendor/geoip/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
}
else if [@nginx_fields][client] != "-" {
geoip {
source => "[@nginx_fields][client]"
target => "geoip"
database => "/data/app_platform/logstash/vendor/geoip/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
}
这里有两个字段都可能含有外网IP,client和x_forwarded_for,这里需要注意一下这里的写法,@nginx_fields下面的client需要写成[@nginx_fields][client] 而不能写成@nginx_fields.client
还有GeoIP的IP库可能会与真实的IP数据有出入,可以定期下载最新的IP库更新。
参考文档:
https://blog.pkhamre.com/logging-to-logstash-json-format-in-nginx/
http://xylil.com/2012/07/14/nginx-logs-in-json-format/
https://community.ulyaoth.net/threads/how-to-create-a-logstash-geoip-based-dashboard-in-kibana-3.29/
http://blog.51yip.com/apachenginx/1277.html
http://www.logstash.net/docs/1.4.2/filters/multiline
http://www.505forensics.com/who-have-your-logs-been-talking-to/
转载于:https://blog.51cto.com/john88wang/1632190
下一篇: 腾讯云服务器django项目部署随笔
推荐阅读
-
logstash,nginx日志,grok pattern调试
-
SpringBoot使用Graylog日志收集的实现示例
-
Spring Boot 使用 logback、logstash、ELK 记录日志文件的方法
-
SpringBoot继承LogStash实现日志收集的方法示例
-
实现Nginx中使用PHP-FPM时记录PHP错误日志的配置方法
-
ELK快速入门(二)通过logstash收集日志
-
使用shell脚本对Nginx日志进行切分的示例代码
-
开源日志收集Exceptionless简单使用
-
在.NET Core中使用Exceptionless分布式日志收集框架
-
使用MongoDB分析Nginx日志的方法详解