实战-搭建使用日志采集分析系统ELK
ELK简介
ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。
Filebeat隶属于Beats。目前Beats包含四种工具:
- Packetbeat(搜集网络流量数据)
- Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据)
- Filebeat(搜集文件数据)
- Winlogbeat(搜集 Windows 事件日志数据)
为什么用到ELK
一般我们需要进行日志分析场景:直接在日志文件中 grep、awk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。
ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。目前主流的一种日志系统。
ELK 常用架构
-
最简单架构
在这种架构中,只有一个 Logstash、Elasticsearch 和 Kibana 实例。Logstash 通过输入插件从多种数据源(比如日志文件、标准输入 Stdin 等)获取数据,再经过滤插件加工数据,然后经 Elasticsearch 输出插件输出到 Elasticsearch,通过 Kibana 展示。
这种架构非常简单,使用场景也有限。初学者可以搭建这个架构,了解 ELK 如何工作 -
以Logstash 作为日志搜集器
这种结构因为需要在各个服务器上部署 Logstash,而它比较消耗 CPU 和内存资源,所以比较适合计算资源丰富的服务器,否则容易造成服务器性能下降,甚至可能导致无法正常工作。 -
以Beats 作为日志搜集器
这种架构引入 Beats 作为日志搜集器。目前 Beats 包括四种:- Packetbeat(搜集网络流量数据);
- Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据);
- Filebeat(搜集文件数据);
- Winlogbeat(搜集 Windows 事件日志数据)。
Beats 将搜集到的数据发送到 Logstash,经 Logstash 解析、过滤后,将其发送到 Elasticsearch 存储,并由 Kibana 呈现给用户。
基于 Filebeat+ELK架构的配置部署详解
1、Filebeat负责收集应用写到磁盘上的日志,并将日志发送给logstash
2、logstash处理来自filebeat的日志,并将处理后的日志保存elasticsearch索引库。
3、elasticsearch存储来自logstash的日志。
4、kibana从elasticsearch搜索日志,并展示到页面。
安装jdk
至少jdk 7以上。一般推荐使用 Oracle JDK 1.8 或者 OpenJDK 1.8。我们这里使用 Oracle JDK 1.8。
mkdir /usr/java
tar xf jdk-8u171-linux-x64.tar.gz
mv jdk1.8.0_171/ /usr/java/jdk1.8
#添加java环境变量
echo 'export JAVA_HOME=/usr/java/jdk1.8' >>/etc/profile
echo 'export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar' >>/etc/profile
echo 'export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH' >>/etc/profile
source /etc/profile
java -version
安装elasticsearch
- 创建用户
出于安全考虑,elasticsearch默认不允许以root账号运行。
groupadd es
useradd -g es es
echo "123456"|passwd --stdin es
- 调整系统参数
echo "vm.max_map_count = 655360" >> /etc/sysctl.conf
sysctl -p
vim /etc/security/limits.conf ==》新增如下内容
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
- 安装配置es
[aaa@qq.com ~]# tar xf elasticsearch-6.8.1.tar.gz
[aaa@qq.com ~]# mv elasticsearch-6.8.1 /usr/local/elasticsearch
[aaa@qq.com ~]# cd /usr/local/elasticsearch/config/
[aaa@qq.com config]# cp elasticsearch.yml elasticsearch.yml.bak
[aaa@qq.com config]# > elasticsearch.yml
[aaa@qq.com config]# vim elasticsearch.yml #配置文件修改成如下
cluster.name: "elasticsearch_petition"
node.name: node-01
transport.host: 0.0.0.0
transport.publish_host: 0.0.0.0
transport.bind_host: 0.0.0.0
network.host: 0.0.0.0
http.port: 9200
path.data: /usr/local/elasticsearch/data
path.logs: /usr/local/elasticsearch/logs
http.cors.enabled: true
http.cors.allow-origin: "*"
[aaa@qq.com config]# mkdir /usr/local/elasticsearch/data
[aaa@qq.com config]# mkdir /usr/local/elasticsearch/logs
[aaa@qq.com ~]# chown -R es.es /usr/local/elasticsearch/
-
启动
su - es /usr/local/elasticsearch/bin/elasticsearch &
#启动比较慢,稍等几分钟 -
验证 elasticsearch
复制代码
[aaa@qq.com ~]$ curl 127.0.0.1:9200 #出来以下内容,安装成功
{
"name" : "node-01",
"cluster_name" : "elasticsearch_petition",
"cluster_uuid" : "jPmcOZu5Rfi5JOq1wsWUSQ",
"version" : {
"number" : "6.8.1",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "1fad4e1",
"build_date" : "2019-06-18T13:16:52.517138Z",
"build_snapshot" : false,
"lucene_version" : "7.7.0",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}
-
安装elasticsearch-head插件
安装docker镜像或者通过github下载elasticsearch-head项目都是可以的,1或者2两种方式选择一种安装使用即可:
a. 使用docker的集成好的elasticsearch-headdocker run -p 9100:9100 mobz/elasticsearch-head:5 docker容器下载成功并启动以后,运行浏览器打开http://localhost:9100/
b. 使用git安装elasticsearch-head
yum install -y npm git clone git://github.com/mobz/elasticsearch-head.git cd elasticsearch-head npm install npm run start 检查端口是否起来 netstat -antp |grep 9100 浏览器访问测试是否正常 http://IP:9100/
安装filebeat
下载地址:https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.6.4-linux-x86_64.tar.gz
- 解压tar -zxvf filebeat-5.6.4-linux-x86_64.tar.gz
- 修改filebeat配置文件
vi filebeat.ymll
#配置输入文件地址,我这边是配置了三个文件
#配置输出logstash地址,因为我是集群,所以有多个
- 启动filebeat
./filebeat -e -c ./filebeat.yml
安装LogStash
下载地址:https://www.elastic.co/downloads/past-releases/logstash-5-4-1
- 解压unzip logstash-5.4.1
- 修改配置文件
vi config/conf.d/logstash.conf
## 端口
input {
beats {
port => 5044
}
}
## 分析日志数据,对数据格式进行处理
filter {
if "app" in [tags]{
ruby {
## 指定rb脚本处理
path => "/home/logstash/config/conf.d/ap_packet.rb"
}
mutate {
remove_field => ["message", "source", "beat", "prospector", "offset", "host", "@version"]
}
}
if "nginx" in [tags]{
if([message]=~ /GET/){
drop{}
}else{
## 指定rb脚本处理
ruby {
path => "/home/logstash/config/conf.d/nginx_success.rb"
}
mutate {
remove_field => ["message", "source", "beat", "prospector", "offset", "host", "@version"]
}
}
}
}
### 输出目标
output {
if "app" in [tags]{
elasticsearch {
hosts => ["10.10.10.46:9205","10.10.10.47:9205","10.10.10.48:9205"]
index => "ap_pack_history_%{+YYYYMMdd}"
document_type => "log"
}
}
if "nginx" in [tags]{
stdout {
codec => rubydebug #控制台输出处理过后的数据
}
elasticsearch {
hosts => ["10.10.10.46:9205","10.10.10.47:9205","10.10.10.48:9205"]
index => "nginx_success_%{+YYYYMMdd}"
document_type => "log"
}
}
}
rb脚本切割文件
def filter(event)
message = event.get("message")
tvalue = message[1,message.length-2]
values = tvalue.split(",")
time = Time.new
time1=time.strftime("%Y-%m-%d %H:%M:%S")
event.set("updatetime",time1)
clientIp= values[0]
event.set("clientIp",clientIp)
status= values[4]
event.set("status",status)
content=values[3]
arr = content.split("&")
index = 0
for a in arr do
if a =~ /Change/
event.set("Change",a)
elsif a =~ /Counter/
event.set("Counter",a)
elsif a =~ /MAC/
mac=a.split("=")
event.set("MAC",mac[1])
elsif a =~ /SSID/
event.set("SSID",a)
end
index = index + 1
end
return [event]
end
最后,可以通过Elasticsearch-head查看日志收集结果