欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

五大工具组件Flume数据采集组件安装和部署

程序员文章站 2022-03-14 13:44:45
...

五大工具组件Flume数据采集组件安装和部署

1.Flume的安装

  • Flume的安装非常简单,只需要上传解压即可 tar -zxvf apache-flume-1.8.0-bin.tar.gz
  • 进入 flume 的目录,修改 conf 下的 flume-env.sh,在里面配置 JAVA_HOME

2.Flume经典案例部署

2.1采集目录到 HDFS

五大工具组件Flume数据采集组件安装和部署

- 配置文件编写:spooldir-hdfs.properties

#定义三大组件的名称 
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1  
#   配置 source 组件 
agent1.sources.source1.type = spooldir 
agent1.sources.source1.spoolDir = /home/hadoop/logs/ 
agent1.sources.source1.fileHeader = false  

#  配置拦截器 
agent1.sources.source1.interceptors = i1 
agent1.sources.source1.interceptors.i1.type = host 
agent1.sources.source1.interceptors.i1.hostHeader = hostname 

#  配置 sink 组件 
agent1.sinks.sink1.type = hdfs 
agent1.sinks.sink1.hdfs.path=hdfs://myha01/flume_log/%y-%m-%d/%H-%M 
agent1.sinks.sink1.hdfs.filePrefix = events 
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 
agent1.sinks.sink1.hdfs.batchSize= 100 
agent1.sinks.sink1.hdfs.fileType = DataStream 
agent1.sinks.sink1.hdfs.writeFormat =Text 
agent1.sinks.sink1.hdfs.rollSize = 102400 
agent1.sinks.sink1.hdfs.rollCount = 1000000 
agent1.sinks.sink1.hdfs.rollInterval = 60 
#agent1.sinks.sink1.hdfs.round = true 
#agent1.sinks.sink1.hdfs.roundValue = 10 
#agent1.sinks.sink1.hdfs.roundUnit = minute 
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 
# Use a channel which buffers events in memory 
agent1.channels.channel1.type = memory 
agent1.channels.channel1.keep-alive = 120 
agent1.channels.channel1.capacity = 500000 
agent1.channels.channel1.transactionCapacity = 600 

# Bind the source and sink to the channel 
agent1.sources.source1.channels = channel1 
agent1.sinks.sink1.channel = channel1 

- 启动 bin/flume-ng agent -c conf -f agentconf/spooldir-hdfs.properties -n agent1

2.2 采集文件到 HDFS

  • 配置文件编写:tail-hdfs.properties
agent1.sources = source1 
agent1.sinks = sink1 
agent1.channels = channel1 

# Describe/configure tail -F source1 
agent1.sources.source1.type = exec 
agent1.sources.source1.command = tail -F /home/hadoop/logs/catalina.out 
agent1.sources.source1.channels = channel1 

#configure host for source 
agent1.sources.source1.interceptors = i1 
agent1.sources.source1.interceptors.i1.type = host 
agent1.sources.source1.interceptors.i1.hostHeader = hostname 

# Describe sink1 
agent1.sinks.sink1.type = hdfs 
#a1.sinks.k1.channel = c1 
agent1.sinks.sink1.hdfs.path =hdfs://myha01/weblog/flume-event/%y-%m-%d/%H-%M 
agent1.sinks.sink1.hdfs.filePrefix = tomcat_ 
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 
agent1.sinks.sink1.hdfs.batchSize= 100 
agent1.sinks.sink1.hdfs.fileType = DataStream 
agent1.sinks.sink1.hdfs.writeFormat =Text 
agent1.sinks.sink1.hdfs.rollSize = 102400 
agent1.sinks.sink1.hdfs.rollCount = 1000000 
agent1.sinks.sink1.hdfs.rollInterval = 60 
agent1.sinks.sink1.hdfs.round = true 
agent1.sinks.sink1.hdfs.roundValue = 10 
agent1.sinks.sink1.hdfs.roundUnit = minute 
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 

# Use a channel which buffers events in memory 
agent1.channels.channel1.type = memory 
agent1.channels.channel1.keep-alive = 120 
agent1.channels.channel1.capacity = 500000 
agent1.channels.channel1.transactionCapacity = 600 

# Bind the source and sink to the channel 
agent1.sources.source1.channels = channel1 
agent1.sinks.sink1.channel = channel1 

# 启动   bin/flume-ng agent -c conf -f agentconf/tail-hdfs.properties -n agent1

2.3 多路复用采集

# 这里监听hadoop01的日志文件,一份发送给hdfs,一份通过端口发送给hadoop02再存入hdfs
vim 01.properties
agent1.sources = source1
agent1.sinks = sink1 sink2
agent1.channels = channel1 channel2
# 配置 source 组件

# 监控文件
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/testlog/date.log

#监控文件夹
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /home/hadoop/logs/
agent1.sources.source1.channels = channel1 channel2

# 配置 sink1 组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path=hdfs://bd1804/fuyong/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# 配置channel1
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

#配置sink2组件
agent1.sinks.sink2.type = avro
agent1.sinks.sink2.channel = channel2
agent1.sinks.sink2.hostname = hadoop02
agent1.sinks.sink2.port = 4141
agent1.sinks.sink2.batch-size = 2

#配置channel2
agent1.channels.channel2.type = memory
agent1.channels.channel2.capacity = 1000
agent1.channels.channel2.transactionCapacity = 100

# 建立联系
agent1.sources.source1.channels = channel1 channel2
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink2.channel = channel2


#启动 bin/flume-ng agent -c conf -n agent1 -f agentconf/01.properties


vim 02.properties

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://bd1804/fuyong1/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = date_
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat =Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#启动 bin/flume-ng agent -c conf -n a1 -f agentconf/02.properties

2.4 多agent串联采集

4.1在 IP 为 192.168.123.104 的 hadoop04 上的 agentconf 下创建一个 tail-avro.properties
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
# Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /home/hadoop/testlog/date.log 
a1.sources.r1.channels = c1 

# Describe the sink 
a1.sinks.k1.type = avro 
a1.sinks.k1.channel = c1 
a1.sinks.k1.hostname = hadoop05 
a1.sinks.k1.port = 4141 
a1.sinks.k1.batch-size = 2 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

在 IP 为 192.168.123.105 的 hadoop05 机器上配置采集方案 avro-hdfs.properties
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = avro 
a1.sources.r1.channels = c1 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 4141 

# Describe k1 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path =hdfs://myha01/testlog/flume-event/%y-%m-%d/%H-%M 
a1.sinks.k1.hdfs.filePrefix = date_ 
a1.sinks.k1.hdfs.maxOpenFiles = 5000 
a1.sinks.k1.hdfs.batchSize= 100 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat =Text 
a1.sinks.k1.hdfs.rollSize = 102400 
a1.sinks.k1.hdfs.rollCount = 1000000 
a1.sinks.k1.hdfs.rollInterval = 60 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

先启动 bin/flume-ng agent -c conf -n a1 -f agentconf/avro-hdfs.properties -Dflume.root.logger=INFO,console
再启动 bin/flume-ng agent -c conf -n a1 -f agentconf/tail-avro.properties -Dflume.root.logger=INFO,console 

2.5 高可用

- 第一步节点分配
五大工具组件Flume数据采集组件安装和部署
- 第二步配置文件
在下面单点 Flume 中,基本配置都完成了,我们只需要新添加两个配置文件,它们是ha_agent.properties 和 ha_collector.properties,其配置内容如下所示:
ha_agent.properties 配置:

#agent name: agent1 
agent1.channels = c1 
agent1.sources = r1 
agent1.sinks = k1 k2 

#set gruop 
agent1.sinkgroups = g1 

#set channel 
agent1.channels.c1.type = memory 
agent1.channels.c1.capacity = 1000 
agent1.channels.c1.transactionCapacity = 100 

agent1.sources.r1.channels = c1 
agent1.sources.r1.type = exec 
agent1.sources.r1.command = tail -F /home/hadoop/testlog/testha.log 

agent1.sources.r1.interceptors = i1 i2 
agent1.sources.r1.interceptors.i1.type = static 
agent1.sources.r1.interceptors.i1.key = Type 
agent1.sources.r1.interceptors.i1.value = LOGIN 
agent1.sources.r1.interceptors.i2.type = timestamp 

# set sink1 
agent1.sinks.k1.channel = c1 
agent1.sinks.k1.type = avro 
agent1.sinks.k1.hostname = hadoop04 
agent1.sinks.k1.port = 52020 

# set sink2 
agent1.sinks.k2.channel = c1 
agent1.sinks.k2.type = avro 
agent1.sinks.k2.hostname = hadoop05 
agent1.sinks.k2.port = 52020 

#set sink group 
agent1.sinkgroups.g1.sinks = k1 k2 

#set failover 
agent1.sinkgroups.g1.processor.type = failover 
agent1.sinkgroups.g1.processor.priority.k1 = 10 
agent1.sinkgroups.g1.processor.priority.k2 = 1 
agent1.sinkgroups.g1.processor.maxpenalty = 10000 

ha_collector.properties 配置:

#set agent name 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 

#set channel 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# other node,nna to nns 
a1.sources.r1.type = avro 
## 当前主机为什么,就修改成什么主机名 
a1.sources.r1.bind = hadoop04 
a1.sources.r1.port = 52020 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = static 
a1.sources.r1.interceptors.i1.key = Collector 
## 当前主机为什么,就修改成什么主机名 
a1.sources.r1.interceptors.i1.value = hadoop04 
a1.sources.r1.channels = c1 

#set sink to hdfs 
a1.sinks.k1.type=hdfs 
a1.sinks.k1.hdfs.path= hdfs://myha01/flume_ha/loghdfs 
a1.sinks.k1.hdfs.fileType=DataStream 
a1.sinks.k1.hdfs.writeFormat=TEXT 
a1.sinks.k1.hdfs.rollInterval=10 
a1.sinks.k1.channel=c1 
a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

先启动 hadoop04 和 hadoop05 上的 collector 角色:
bin/flume-ng agent -c conf -f agentconf/ha_collector.properties -n a1 -
Dflume.root.logger=INFO,console

然后启动 hadoop02,hadoop03 上的 agent 角色:
bin/flume-ng agent -c conf -f agentconf/ha_agent.properties -n agent1 -
Dflume.root.logger=INFO,console

相关标签: Flume