欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume理论基础

程序员文章站 2023-12-23 09:52:58
...

Flume理论基础

1、什么是Flume

Apache Flume是Apache Software Foundation的*项目。是一个分布式,可靠且可用的系统,用于高效地收集,聚合大量日志数据并将其从许多不同的源移动到集中式数据存储中。但是Apache Flume的使用不仅限于日志数据聚合。由于数据源是可定制的,因此Flume可用于传输大量事件数据,包括但不限于网络流量数据,社交媒体生成的数据,电子邮件消息以及几乎所有可能的数据源。

2、Flume数据流模型

Flume理论基础
在一个Flume数据流模型中,Flume组件Agent扮演着数据收集和传递的角色,Agent是Flume中的核心组件,用来收集数据。一个Agent就是一个JVM进程,它是Flume中最小的独立运行的单元。一个Flume系统中可以有多个Agent。

2.1 Agent组件

  • Source:Source从数据源接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channel,Flume提供多种数据接收的方式,比如Avro,Thrift,twitter。

  • Channel:channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着桥梁的作用,channel是一个完整的事务,这一点保证了数据在收发的时候的一致性。并且它可以和任意数量的source和sink链接. 支持的类型有: JDBC channel,File System channel,Memory channel等。

  • Sink:sink从 Channel中取数据,并将数据存储到集中存储器比如Hbase和HDFS,它从channels消费数据(events)并将其传递给目标地. 目标地可能是另一个sink,也可能HDFS,HBase。

3、Flume的特性

  • Flume可以将应用产生的数据存储到任何集中存储器中,比如HDFS,HBase。
  • 除了日志信息,Flume同时也可以用来接入收集规模宏大的社交网络节点事件数据,比如facebook,twitter,电商网站如亚马逊,flipkart等。
  • 支持各种接入资源数据的类型以及接出数据类型。
  • 支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等。
  • Flume的管道是基于事务,保证了数据在传送和接收时的一致性。

4、Flume配置

4.1 JDK下载及配置

JDK1.8下载,然后在Linux上配置好环境变量

4.2 下载和配置单Agent Flume

(1) 下载Flume1.6.0,将其放入任意一个目录并解压,以目录/apps/flume为例,此时Flume的路径为:

/apps/flume/bin

(2)修改etc/profile文件,添加环境变量

vi /etc/profile
export FLUME_HOME= /apps/flume
PATH=$FLUME_HOME/bin:$PATH

//别忘记source
source /etc/profile

(3)修改flume安装目录下的配置文件

cd $FLUME_HOME
cp /conf/flume-env.sh.template flume-env.sh
vi flume-env.sh
//配置JAVA_HOME为你的JDK的路径
export JAVA_HOME= /usr/java/jdk

到这里flume就可以正常启动了。

4.3测试Flume

(1)我们使用官网提供的一个例子来测试Flume,先在安装了Flume的那台主机上新建一个文件夹,如/flumedir,创建一个新文件option,文件包含以下内容。

# option文件的内容
# example.conf: A single-node Flume configuration

Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#  Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)进入/flumedir目录,执行以下语句:

flume-ng agent --conf conf --conf-file option --name a1 -Dflume.root.logger=INFO,console

(3)在另一台主机上通过telnet来连接这台主机(如果未安装telnet,可以执行下列语句来安装telnet:yum install telnet -y),输入任意字符串,

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

(4)观察flume主机上是否有输出。

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }

可以看到,flume主机上显示了我们的输入信息,说明flume已安装成功。退出telnet(cltr+],再输入quit)。

4.4 配置多Agent Flume流

Flume理论基础
(1)进入node01的/flumedir目录,新建一个文件option2。文件内容如下:

 # option2文件的内容
 # example.conf: A single-node Flume configuration

 Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#  Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444


# Describe the sink
# 需要修改sink,因为要下沉到第二个Agent中。
a1.sinks.k1.type = avro
# 设置需要下沉的目标主机
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 4545

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)进入node02的/flumedir目录,新建一个文件option3。文件内容如下:

 # option3文件的内容
 # example.conf: A single-node Flume configuration

 Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#  Describe/configure the source
# 需要修改source,因为源是上一个Agent
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
//注意,这里绑定的是自己主机的端口,所以绑定node03
a1.sources.r1.bind = node03
a1.sources.r1.port = 4545


# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)先启动node03的flume。

flume-ng agent --conf conf --conf-file option3 --name a1 -Dflume.root.logger=INFO,console

(4)启动node02的flume。

flume-ng agent --conf conf --conf-file option2 --name a1 -Dflume.root.logger=INFO,console

(5)向node02发送消息。

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

(6)node02无输出,node03会显示输出。

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }

上一篇:

下一篇: