欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

实战-搭建使用日志采集分析系统ELK

程序员文章站 2022-04-28 08:59:21
...

ELK简介

ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。

Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。

Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。

Filebeat隶属于Beats。目前Beats包含四种工具:

  1. Packetbeat(搜集网络流量数据)
  2. Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据)
  3. Filebeat(搜集文件数据)
  4. Winlogbeat(搜集 Windows 事件日志数据)

为什么用到ELK

一般我们需要进行日志分析场景:直接在日志文件中 grep、awk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。
ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。目前主流的一种日志系统。

ELK 常用架构

  1. 最简单架构
    在这种架构中,只有一个 Logstash、Elasticsearch 和 Kibana 实例。Logstash 通过输入插件从多种数据源(比如日志文件、标准输入 Stdin 等)获取数据,再经过滤插件加工数据,然后经 Elasticsearch 输出插件输出到 Elasticsearch,通过 Kibana 展示。
    实战-搭建使用日志采集分析系统ELK
    这种架构非常简单,使用场景也有限。初学者可以搭建这个架构,了解 ELK 如何工作

  2. 以Logstash 作为日志搜集器
    实战-搭建使用日志采集分析系统ELK
    这种结构因为需要在各个服务器上部署 Logstash,而它比较消耗 CPU 和内存资源,所以比较适合计算资源丰富的服务器,否则容易造成服务器性能下降,甚至可能导致无法正常工作。

  3. 以Beats 作为日志搜集器
    这种架构引入 Beats 作为日志搜集器。目前 Beats 包括四种:

    1. Packetbeat(搜集网络流量数据);
    2. Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据);
    3. Filebeat(搜集文件数据);
    4. Winlogbeat(搜集 Windows 事件日志数据)。

实战-搭建使用日志采集分析系统ELK
Beats 将搜集到的数据发送到 Logstash,经 Logstash 解析、过滤后,将其发送到 Elasticsearch 存储,并由 Kibana 呈现给用户。

基于 Filebeat+ELK架构的配置部署详解

实战-搭建使用日志采集分析系统ELK
1、Filebeat负责收集应用写到磁盘上的日志,并将日志发送给logstash
2、logstash处理来自filebeat的日志,并将处理后的日志保存elasticsearch索引库。
3、elasticsearch存储来自logstash的日志。
4、kibana从elasticsearch搜索日志,并展示到页面。

安装jdk

至少jdk 7以上。一般推荐使用 Oracle JDK 1.8 或者 OpenJDK 1.8。我们这里使用 Oracle JDK 1.8。

mkdir /usr/java
tar xf jdk-8u171-linux-x64.tar.gz
mv jdk1.8.0_171/ /usr/java/jdk1.8

#添加java环境变量

echo 'export JAVA_HOME=/usr/java/jdk1.8' >>/etc/profile
echo 'export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar' >>/etc/profile
echo 'export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH' >>/etc/profile
source /etc/profile
java -version

安装elasticsearch

  1. 创建用户
    出于安全考虑,elasticsearch默认不允许以root账号运行。
groupadd es
useradd -g es es
echo "123456"|passwd --stdin es
  1. 调整系统参数
echo "vm.max_map_count = 655360" >> /etc/sysctl.conf
sysctl -p
vim /etc/security/limits.conf  ==》新增如下内容
* soft nofile 65536

* hard nofile 65536

* soft nproc 65536

* hard nproc 65536
  1. 安装配置es
[aaa@qq.com ~]# tar xf elasticsearch-6.8.1.tar.gz
[aaa@qq.com ~]# mv elasticsearch-6.8.1 /usr/local/elasticsearch
[aaa@qq.com ~]# cd /usr/local/elasticsearch/config/
[aaa@qq.com config]# cp elasticsearch.yml elasticsearch.yml.bak
[aaa@qq.com config]# > elasticsearch.yml
[aaa@qq.com config]# vim elasticsearch.yml   #配置文件修改成如下
cluster.name: "elasticsearch_petition"
node.name: node-01
transport.host: 0.0.0.0
transport.publish_host: 0.0.0.0
transport.bind_host: 0.0.0.0
network.host: 0.0.0.0
http.port: 9200
path.data: /usr/local/elasticsearch/data
path.logs: /usr/local/elasticsearch/logs
http.cors.enabled: true
http.cors.allow-origin: "*"
[aaa@qq.com config]# mkdir /usr/local/elasticsearch/data
[aaa@qq.com config]# mkdir /usr/local/elasticsearch/logs
[aaa@qq.com ~]# chown -R es.es /usr/local/elasticsearch/
  1. 启动
    su - es /usr/local/elasticsearch/bin/elasticsearch &
    #启动比较慢,稍等几分钟

  2. 验证 elasticsearch
    复制代码
    [aaa@qq.com ~]$ curl 127.0.0.1:9200 #出来以下内容,安装成功

{
  "name" : "node-01",
  "cluster_name" : "elasticsearch_petition",
  "cluster_uuid" : "jPmcOZu5Rfi5JOq1wsWUSQ",
  "version" : {
    "number" : "6.8.1",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "1fad4e1",
    "build_date" : "2019-06-18T13:16:52.517138Z",
    "build_snapshot" : false,
    "lucene_version" : "7.7.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}
  1. 安装elasticsearch-head插件
    安装docker镜像或者通过github下载elasticsearch-head项目都是可以的,1或者2两种方式选择一种安装使用即可:
    a. 使用docker的集成好的elasticsearch-head

     docker run -p 9100:9100 mobz/elasticsearch-head:5
     docker容器下载成功并启动以后,运行浏览器打开http://localhost:9100/
    

    b. 使用git安装elasticsearch-head

     yum install -y npm
     git clone git://github.com/mobz/elasticsearch-head.git
     cd elasticsearch-head
     npm install
     npm run start
     检查端口是否起来
     netstat -antp |grep 9100
     浏览器访问测试是否正常
     http://IP:9100/
    

安装filebeat

下载地址:https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.6.4-linux-x86_64.tar.gz

  1. 解压tar -zxvf filebeat-5.6.4-linux-x86_64.tar.gz
  2. 修改filebeat配置文件
    vi filebeat.ymll
    #配置输入文件地址,我这边是配置了三个文件 实战-搭建使用日志采集分析系统ELK
    #配置输出logstash地址,因为我是集群,所以有多个
    实战-搭建使用日志采集分析系统ELK
  3. 启动filebeat
    ./filebeat -e -c ./filebeat.yml

安装LogStash

下载地址:https://www.elastic.co/downloads/past-releases/logstash-5-4-1

  1. 解压unzip logstash-5.4.1
  2. 修改配置文件
    vi config/conf.d/logstash.conf
## 端口
input {
  beats {
    port => 5044
  }
}

## 分析日志数据,对数据格式进行处理
filter {
   if "app" in [tags]{
        ruby {
        	## 指定rb脚本处理
                path => "/home/logstash/config/conf.d/ap_packet.rb"
        }
        mutate {
                remove_field => ["message", "source", "beat", "prospector", "offset", "host", "@version"]
        }
  }

  if "nginx" in [tags]{
        if([message]=~ /GET/){
           drop{}
        }else{
        	## 指定rb脚本处理
           ruby {
               path => "/home/logstash/config/conf.d/nginx_success.rb"
           }
           mutate {
                remove_field => ["message", "source", "beat", "prospector", "offset", "host", "@version"]
           }
        }
  }
}

### 输出目标
output {
   if "app" in [tags]{
    elasticsearch {
       hosts => ["10.10.10.46:9205","10.10.10.47:9205","10.10.10.48:9205"]
       index => "ap_pack_history_%{+YYYYMMdd}"
       document_type => "log"
       }
   }

  if "nginx" in [tags]{
    stdout {
         codec => rubydebug #控制台输出处理过后的数据
   }
    elasticsearch {
       hosts => ["10.10.10.46:9205","10.10.10.47:9205","10.10.10.48:9205"]
       index => "nginx_success_%{+YYYYMMdd}"
       document_type => "log"
     }
   }
}

rb脚本切割文件

def filter(event)
    message = event.get("message")
    tvalue = message[1,message.length-2]
    values = tvalue.split(",")
    time = Time.new
    time1=time.strftime("%Y-%m-%d %H:%M:%S")
    event.set("updatetime",time1)
    clientIp= values[0]
    event.set("clientIp",clientIp)
    status= values[4]
    event.set("status",status)
    content=values[3]
    arr = content.split("&")

    index = 0
    for a in arr do
        if a =~ /Change/
           event.set("Change",a)
        elsif a =~ /Counter/
           event.set("Counter",a)
        elsif a =~ /MAC/
           mac=a.split("=")
           event.set("MAC",mac[1])
        elsif a =~ /SSID/
           event.set("SSID",a)
        end
        index = index + 1
    end
    return [event]
end

最后,可以通过Elasticsearch-head查看日志收集结果
实战-搭建使用日志采集分析系统ELK