您现在的位置是: 首页


程序员文章站 2022-03-14 13:44:45



  • Flume的安装非常简单,只需要上传解压即可 tar -zxvf apache-flume-1.8.0-bin.tar.gz
  • 进入 flume 的目录,修改 conf 下的 flume-env.sh,在里面配置 JAVA_HOME


2.1采集目录到 HDFS


- 配置文件编写:spooldir-hdfs.properties

agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1  
#   配置 source 组件 
agent1.sources.source1.type = spooldir 
agent1.sources.source1.spoolDir = /home/hadoop/logs/ 
agent1.sources.source1.fileHeader = false  

#  配置拦截器 
agent1.sources.source1.interceptors = i1 
agent1.sources.source1.interceptors.i1.type = host 
agent1.sources.source1.interceptors.i1.hostHeader = hostname 

#  配置 sink 组件 
agent1.sinks.sink1.type = hdfs 
agent1.sinks.sink1.hdfs.filePrefix = events 
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 
agent1.sinks.sink1.hdfs.batchSize= 100 
agent1.sinks.sink1.hdfs.fileType = DataStream 
agent1.sinks.sink1.hdfs.writeFormat =Text 
agent1.sinks.sink1.hdfs.rollSize = 102400 
agent1.sinks.sink1.hdfs.rollCount = 1000000 
agent1.sinks.sink1.hdfs.rollInterval = 60 
#agent1.sinks.sink1.hdfs.round = true 
#agent1.sinks.sink1.hdfs.roundValue = 10 
#agent1.sinks.sink1.hdfs.roundUnit = minute 
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 
# Use a channel which buffers events in memory 
agent1.channels.channel1.type = memory 
agent1.channels.channel1.keep-alive = 120 
agent1.channels.channel1.capacity = 500000 
agent1.channels.channel1.transactionCapacity = 600 

# Bind the source and sink to the channel 
agent1.sources.source1.channels = channel1 
agent1.sinks.sink1.channel = channel1 

- 启动 bin/flume-ng agent -c conf -f agentconf/spooldir-hdfs.properties -n agent1

2.2 采集文件到 HDFS

  • 配置文件编写:tail-hdfs.properties
agent1.sources = source1 
agent1.sinks = sink1 
agent1.channels = channel1 

# Describe/configure tail -F source1 
agent1.sources.source1.type = exec 
agent1.sources.source1.command = tail -F /home/hadoop/logs/catalina.out 
agent1.sources.source1.channels = channel1 

#configure host for source 
agent1.sources.source1.interceptors = i1 
agent1.sources.source1.interceptors.i1.type = host 
agent1.sources.source1.interceptors.i1.hostHeader = hostname 

# Describe sink1 
agent1.sinks.sink1.type = hdfs 
#a1.sinks.k1.channel = c1 
agent1.sinks.sink1.hdfs.path =hdfs://myha01/weblog/flume-event/%y-%m-%d/%H-%M 
agent1.sinks.sink1.hdfs.filePrefix = tomcat_ 
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 
agent1.sinks.sink1.hdfs.batchSize= 100 
agent1.sinks.sink1.hdfs.fileType = DataStream 
agent1.sinks.sink1.hdfs.writeFormat =Text 
agent1.sinks.sink1.hdfs.rollSize = 102400 
agent1.sinks.sink1.hdfs.rollCount = 1000000 
agent1.sinks.sink1.hdfs.rollInterval = 60 
agent1.sinks.sink1.hdfs.round = true 
agent1.sinks.sink1.hdfs.roundValue = 10 
agent1.sinks.sink1.hdfs.roundUnit = minute 
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 

# Use a channel which buffers events in memory 
agent1.channels.channel1.type = memory 
agent1.channels.channel1.keep-alive = 120 
agent1.channels.channel1.capacity = 500000 
agent1.channels.channel1.transactionCapacity = 600 

# Bind the source and sink to the channel 
agent1.sources.source1.channels = channel1 
agent1.sinks.sink1.channel = channel1 

# 启动   bin/flume-ng agent -c conf -f agentconf/tail-hdfs.properties -n agent1

2.3 多路复用采集

# 这里监听hadoop01的日志文件,一份发送给hdfs,一份通过端口发送给hadoop02再存入hdfs
vim 01.properties
agent1.sources = source1
agent1.sinks = sink1 sink2
agent1.channels = channel1 channel2
# 配置 source 组件

# 监控文件
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/testlog/date.log

agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /home/hadoop/logs/
agent1.sources.source1.channels = channel1 channel2

# 配置 sink1 组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

# 配置channel1
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600

agent1.sinks.sink2.type = avro
agent1.sinks.sink2.channel = channel2
agent1.sinks.sink2.hostname = hadoop02
agent1.sinks.sink2.port = 4141
agent1.sinks.sink2.batch-size = 2

agent1.channels.channel2.type = memory
agent1.channels.channel2.capacity = 1000
agent1.channels.channel2.transactionCapacity = 100

# 建立联系
agent1.sources.source1.channels = channel1 channel2
agent1.sinks.sink1.channel = channel1
agent1.sinks.sink2.channel = channel2

#启动 bin/flume-ng agent -c conf -n agent1 -f agentconf/01.properties

vim 02.properties

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind =
a1.sources.r1.port = 4141
# Describe k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://bd1804/fuyong1/%y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = date_
a1.sinks.k1.hdfs.maxOpenFiles = 5000
a1.sinks.k1.hdfs.batchSize= 100
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat =Text
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 1000000
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#启动 bin/flume-ng agent -c conf -n a1 -f agentconf/02.properties

2.4 多agent串联采集

4.1在 IP 为 的 hadoop04 上的 agentconf 下创建一个 tail-avro.properties
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 
# Describe/configure the source 
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -F /home/hadoop/testlog/date.log 
a1.sources.r1.channels = c1 

# Describe the sink 
a1.sinks.k1.type = avro 
a1.sinks.k1.channel = c1 
a1.sinks.k1.hostname = hadoop05 
a1.sinks.k1.port = 4141 
a1.sinks.k1.batch-size = 2 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

在 IP 为 的 hadoop05 机器上配置采集方案 avro-hdfs.properties
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = avro 
a1.sources.r1.channels = c1 
a1.sources.r1.bind = 
a1.sources.r1.port = 4141 

# Describe k1 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path =hdfs://myha01/testlog/flume-event/%y-%m-%d/%H-%M 
a1.sinks.k1.hdfs.filePrefix = date_ 
a1.sinks.k1.hdfs.maxOpenFiles = 5000 
a1.sinks.k1.hdfs.batchSize= 100 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat =Text 
a1.sinks.k1.hdfs.rollSize = 102400 
a1.sinks.k1.hdfs.rollCount = 1000000 
a1.sinks.k1.hdfs.rollInterval = 60 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

先启动 bin/flume-ng agent -c conf -n a1 -f agentconf/avro-hdfs.properties -Dflume.root.logger=INFO,console
再启动 bin/flume-ng agent -c conf -n a1 -f agentconf/tail-avro.properties -Dflume.root.logger=INFO,console 

2.5 高可用

- 第一步节点分配
- 第二步配置文件
在下面单点 Flume 中,基本配置都完成了,我们只需要新添加两个配置文件,它们是ha_agent.properties 和 ha_collector.properties,其配置内容如下所示:
ha_agent.properties 配置:

#agent name: agent1 
agent1.channels = c1 
agent1.sources = r1 
agent1.sinks = k1 k2 

#set gruop 
agent1.sinkgroups = g1 

#set channel 
agent1.channels.c1.type = memory 
agent1.channels.c1.capacity = 1000 
agent1.channels.c1.transactionCapacity = 100 

agent1.sources.r1.channels = c1 
agent1.sources.r1.type = exec 
agent1.sources.r1.command = tail -F /home/hadoop/testlog/testha.log 

agent1.sources.r1.interceptors = i1 i2 
agent1.sources.r1.interceptors.i1.type = static 
agent1.sources.r1.interceptors.i1.key = Type 
agent1.sources.r1.interceptors.i1.value = LOGIN 
agent1.sources.r1.interceptors.i2.type = timestamp 

# set sink1 
agent1.sinks.k1.channel = c1 
agent1.sinks.k1.type = avro 
agent1.sinks.k1.hostname = hadoop04 
agent1.sinks.k1.port = 52020 

# set sink2 
agent1.sinks.k2.channel = c1 
agent1.sinks.k2.type = avro 
agent1.sinks.k2.hostname = hadoop05 
agent1.sinks.k2.port = 52020 

#set sink group 
agent1.sinkgroups.g1.sinks = k1 k2 

#set failover 
agent1.sinkgroups.g1.processor.type = failover 
agent1.sinkgroups.g1.processor.priority.k1 = 10 
agent1.sinkgroups.g1.processor.priority.k2 = 1 
agent1.sinkgroups.g1.processor.maxpenalty = 10000 

ha_collector.properties 配置:

#set agent name 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 

#set channel 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# other node,nna to nns 
a1.sources.r1.type = avro 
## 当前主机为什么,就修改成什么主机名 
a1.sources.r1.bind = hadoop04 
a1.sources.r1.port = 52020 
a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = static 
a1.sources.r1.interceptors.i1.key = Collector 
## 当前主机为什么,就修改成什么主机名 
a1.sources.r1.interceptors.i1.value = hadoop04 
a1.sources.r1.channels = c1 

#set sink to hdfs 
a1.sinks.k1.hdfs.path= hdfs://myha01/flume_ha/loghdfs 

先启动 hadoop04 和 hadoop05 上的 collector 角色:
bin/flume-ng agent -c conf -f agentconf/ha_collector.properties -n a1 -

然后启动 hadoop02,hadoop03 上的 agent 角色:
bin/flume-ng agent -c conf -f agentconf/ha_agent.properties -n agent1 -

相关标签: Flume