欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume安装配置

程序员文章站 2022-05-01 18:57:55
...

一、简介
Apache Flume是一个分布式的、可靠的、可用的系统,可以有效地收集、聚合和移动大量的日志数据,从许多不同的数据源到一个集中的数据存储。
Apache Flume的使用不仅限于日志数据聚合。由于数据源是可定制的,所以可以使用Flume来传输大量的事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎所有可能的数据源。

一个Flume事件被定义为一个具有字节有效负载和一组可选的字符串属性的数据流单元。一个Flume代理是一个(JVM)进程,它承载组件,通过它,事件从外部源流到下一个目的地(hop)。
代理组件图
Flume安装配置
一个Flume源会使用像web服务器这样的外部源发送给它的事件。外部源以一种被目标Flume源识别的格式将事件发送到Flume。例如,Avro Flume源可以用来从Avro客户端或其他在发送的流中接收Avro事件。

日志收集中一个非常常见的场景是大量的日志生成客户端将数据发送给附加到存储子系统的一些消费者代理。例如,从数百个web服务器收集的日志,发送给数十个写入HDFS集群的代理。
Flume安装配置
以上翻译自官网。

Flume主要由3个重要的组件构成:
Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。
Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。
Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
Flume逻辑上分三层架构:agent,collector,storage
Flume安装配置
agent用于采集数据,agent是flume中产生数据流的地方,同时,agent会将产生的数据流传输到collector。
collector的作用是将多个agent的数据汇总后,加载到storage中。
storage是存储系统,可以是一个普通file,也可以是HDFS,HIVE,HBase等。

Flume的架构主要有一下几个核心概念:

Event:一个数据单元,带有一个可选的消息头
Flow:Event从源点到达目的点的迁移的抽象
Client:操作位于源点处的Event,将其发送到Flume Agent
Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
Source:用来消费传递到该组件的Event
Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)
关于Flume更多内容,可以参考网络文献:Flume的原理和使用
二、安装
官网下载flume,http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
1、解压至/app/flume下,如

[hadoop@master app]$ ls
dataset  elasticsearch  flume  hadoop  hbase  hive  java  kafka  redis  scala  spark  sqoop  tgz  zookeeper

2、配置环境变量

[hadoop@master app]$ vim ~/.bash_profile 

如下配置

export JAVA_HOME=/app/java/jdk1.8.0_141
export HADOOP_HOME=/app/hadoop/hadoop-2.7.3
export SCALA_HOME=/app/scala/scala-2.11.8
export SPARK_HOME=/app/spark/spark-2.1.1
export ZOOKEEPER_HOME=/app/zookeeper/zookeeper-3.4.6
export KAFKA_HOME=/app/kafka/kafka_2.10-0.9.0.0
export HIVE_HOME=/app/hive/apache-hive-2.1.1-bin
export HBASE_HOME=/app/hbase/hbase-1.2.6
export SQOOP_HOME=/app/sqoop/sqoop-1.99.7-bin-hadoop200
export SQOOP_SERVER_EXTRA_LIB=$SQOOP_HOME/extra
export FLUME_HOME=/app/flume/apache-flume-1.7.0-bin
export FLUME_CONF_DIR=/app/flume/apache-flume-1.7.0-bin/confPATH=$PATH:$HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SCALA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin:$HIVE_HOME/bin:$HBASE_HOME/bin:$SQOOP_HOME/bin:$FLUME_HOME/bin

使其生效

[hadoop@master app]$ source ~/.bash_profile 

3、修改flume-env.sh配置文件

[aaa@qq.com conf]$ cp flume-env.sh.template flume-env.sh

配置上JAVA_HOME

export JAVA_HOME=/app/java/jdk1.8.0_141

4、查看flume版本信息

Error: Could not find or load main class org.apache.flume.tools.GetJavaProperty
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523

安装成功
注意:如果系统里安装了hbase,会出现错误: 找不到或无法加载主类 org.apache.flume.tools.GetJavaProperty。如果没有安装hbase,这一步可以略过。
解决办法:

[hadoop@master conf]$ cd /app/hbase/hbase-1.2.6/conf/
  #1、将hbase的hbase.env.sh的这一行配置注释掉,即在export前加一个#
  #export HBASE_CLASSPATH=/home/hadoop/hbase/conf
  #2、或者将HBASE_CLASSPATH改为JAVA_CLASSPATH,配置如下
  export JAVA_CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
  #这里使用第一种方法

解决后再查看flume版本

[aaa@qq.com conf]$ flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523

没有报错信息。
三、测试flume
1.案例1:Avro source
    Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。
a) 创建agent配置文件

[hadoop@master conf]$ vim avro.conf

然后,我们在avro.conf写入以下内容

a1.sources=r1
a1.sinks=k1
a1.channels=c1

#Describe/configure the source
a1.sources.r1.type=avro
a1.sources.r1.channels=c1
a1.sources.bind=0.0.0.0
a1.sources.port=4141

#Describe/configure the sink
a1.sink.k1.type=logger

#Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.capacity=1000
a1.channels.transactionCapacity=100

#Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channels=c1

上面Avro Source参数说明如下:
Avro Source的别名是avro,你同样也可以在这里使用完整类别名称org.apache.flume.source.AvroSource,因此,上面有一行设置是a1.sources.r1.type = avro,表示数据源的类型是avro,亦可以是kafka。
bind绑定的ip地址或主机名,使用0.0.0.0表示绑定机器所有的接口。a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。
port表示绑定的端口。a1.sources.r1.port = 4141,表示绑定的端口是4141。
a1.sinks.k1.type = logger,表示sinks的类型是logger。

b) 启动flume agent a1

[aaa@qq.com conf]$ flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console

这里我们把这个窗口称为agent窗口。

2018-06-19 17:19:20,138 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1
2018-06-19 17:19:20,262 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2018-06-19 17:19:20,266 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2018-06-19 17:19:20,277 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1
2018-06-19 17:19:20,280 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source r1
2018-06-19 17:19:20,284 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:234)] Starting Avro source r1: { bindAddress: 0.0.0.0, port: 4141 }...
2018-06-19 17:19:20,876 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2018-06-19 17:19:20,879 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2018-06-19 17:19:20,883 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:259)] Avro source r1 started.

c) 创建指定文件
先打开另外一个终端,在/app/flume/apache-flume-1.7.0-bin/下写入一个文件log.00,内容为hello,world

我们再打开另外一个终端,执行:

[hadoop@master bin]$ ./flume-ng avro-client --conf conf -H localhost -p 4141 -F ../log.00 

此时我们可以看到第一个终端(agent窗口)下的显示,也就是在日志控制台,就会把log.00文件的内容打印出来:

2018-06-19 17:32:14,471 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }

avro source执行成功!案例一over!
2.案例2:netcatsource
a) 创建agent配置文件

[hadoop@master conf]$ vim example.conf 

在example.conf里写入以下内容:

  #example.conf: A single-node Flume configuration  

    # Name the components on this agent  
    a1.sources = r1  
    a1.sinks = k1  
    a1.channels = c1  

    # Describe/configure the source  
    a1.sources.r1.type = netcat  
    a1.sources.r1.bind = localhost  
    a1.sources.r1.port = 44444 
        #同上,记住该端口名

    # Describe the sink  
    a1.sinks.k1.type = logger  

    # Use a channel which buffers events in memory  
    a1.channels.c1.type = memory  
    a1.channels.c1.capacity = 1000  
    a1.channels.c1.transactionCapacity = 100  

    # Bind the source and sink to the channel  
    a1.sources.r1.channels = c1  
    a1.sinks.k1.channel = c1 
2018-06-19 21:33:23,224 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@50d30f50 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2018-06-19 21:33:23,265 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1
2018-06-19 21:33:23,468 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2018-06-19 21:33:23,469 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2018-06-19 21:33:23,480 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1
2018-06-19 21:33:23,480 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source r1
2018-06-19 21:33:23,489 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting
2018-06-19 21:33:23,511 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

再打开一个终端,输入命令:telnet localhost 44444
如果没安装则安装telnet

rpm -qa|grep telnet

什么也不显示则未安装

sudo yum install xinetd
sudo yum install telnet
sudo yum install telnet-server
sudo vim /etc/xinetd.d/telnet
修改disable =no
sudo service xinetd restart

安装启动完成
再次输入命令:telnet localhost 44444

[aaa@qq.com apache-flume-1.7.0-bin]$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.

然后我们可以在终端下输入任何字符,第一个终端的日志控制台也会有相应的显示,如我们输入”hello,world”,得出

hello world
OK

查看第一个终端

2018-06-19 22:00:09,382 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 0D             hello world. }

netcatsource运行成功
这里补充一点,flume只能传递英文和字符,不能用中文。
至此Flume安装配置完成。

相关标签: Flume安装配置