【Flume】Flume入门解析(二)
(图片来源于网络,侵删)
一、案例实战
【1】Flume故障转移(failover)
Failover Sink Processor能够实现failover功能,具体流程类似load balance
,但是内部处理机制与load balance
完全不同
Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。故障转移机制的作用是将失败的Sink降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,在重试之前冷却时间增加。一旦Sink成功发送一个事件,它将恢复到活动池。 Sink具有与之相关的优先级,数量越大,优先级越高
例如,具有优先级为100的sink在优先级为80的Sink之前被**。如果在发送事件时汇聚失败,则接下来将尝试下一个具有最高优先级的Sink发送事件。如果没有指定优先级,则根据在配置中指定Sink的顺序来确定优先级
整体步骤:
-
1.根据环境编写 flume配置文件
节点1如下配置????
上述代码????
#a1 name
a1.channels = c1
a1.sources = r1
a1.sinks = k1 k2
#
##set gruop
a1.sinkgroups = g1
##set sink group
a1.sinkgroups.g1.sinks = k1 k2
#
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/apache-flume-1.8.0-bin/upload
#
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
## set sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node02
a1.sinks.k1.port = 52020
#
## set sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node03
a1.sinks.k2.port = 52020
#
##set failover
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 2
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
#
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
节点2如下配置(节点3和节点2配置一样,就绑定的主机名不一致,修改一下即可)
:
上述代码????
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
## other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 52020
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
##set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:8020/flume/HA
a1.sinks.k1.hdfs.fileType = DataStream
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
- 2.启动
先在节点2/3根目录下启动:
flume-ng agent -c conf/ -f job/HA-avro-flume-hdfs.conf -n a1
再先在节点1根目录下启动:
flume-ng agent -c conf/ -f job/HA-file-flume-hdfs.conf -n a1
-
3.将节点2杀死 ,然后往被监控目录添加文件
-
4.发现HDFS上产生了文件,表示节点2被杀死,节点3接替进行工作成功
-
5.再次启动节点2,发现节点2接替节点3,继续接收消息
【2】Flume故障转移(failover)
负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:
该方式会随机将消息发到不同的Sink端中,实现负载均衡的目的
整体步骤:
-
1.根据环境编写 flume配置文件
节点1如下配置????
上述代码????
#a1 name
a1.channels = c1
a1.sources = r1
a1.sinks = k1 k2
#
##set gruop
a1.sinkgroups = g1
##set sink group
a1.sinkgroups.g1.sinks = k1 k2
#
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/apache-flume-1.8.0-bin/upload
#
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
## set sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node02
a1.sinks.k1.port = 52021
#
## set sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node03
a1.sinks.k2.port = 52021
#
##set failover
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true #如果开启,则将失败的sink放入黑名单
# 另外还支持random
a1.sinkgroups.g1.processor.selector = round_robin
#在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
节点2/3如下配置????
上述代码????
#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1
## other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 52021
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
##set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:8020/flume/load_banlancer
a1.sinks.k1.hdfs.fileType = DataStream
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
- 2.启动
先在节点2/3根目录下启动:
flume-ng agent -c conf/ -f job/load_banlancer_server.conf -n a1
再先在节点1根目录下启动:
flume-ng agent -c conf/ -f job/load_banlancer_client.conf -n a1
-
3.往被监控目录添加文件
-
4.查看HDFS上的文件
二、过滤器
【1】Flume(interceptor)
整体步骤:
-
1.根据环境编写 flume配置文件
节点1/2如下配置????
上述代码????
# Name the components on this agent
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/servers/apache-flume-1.8.0-bin/upload/access/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
## static拦截器的功能就是往采集到的数据的header中插入自己定## 义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /export/servers/apache-flume-1.8.0-bin/upload/nginx/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /export/servers/apache-flume-1.8.0-bin/upload/web/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41414
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
节点3如下配置????
上述代码????
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port =41414
#添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000
#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://node01:8020/flume/interceptors/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
#生成的文件按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
#生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
#批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 10000
#flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
#操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000
#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
说明:sources需要添加过滤器,类型必须是org.apache.flume.interceptor.TimestampInterceptor$Builder
写入数据时获取前面设置的key ,使用%{type}**
- 2.启动
先在节点3根目录下启动:
flume-ng agent -c conf/ -f job/interceptor2.conf -n a1
再先在节点1/2根目录下启动:
flume-ng agent -c conf/ -f job/interceptor1.conf -n a1
-
3.往被监控目录添加文件
-
4.查看HDFS上的文件
【2】Flume自定义拦截器
1.介绍
Flume是Cloudera提供的一个高可用的
,高可靠的
,分布式的
海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume有各种自带的拦截器
,比如:TimestampInterceptor
、HostInterceptor
、RegexExtractorInterceptor
等,通过使用不同的拦截器,实现不同的功能
但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的Flume处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理
2.自定义拦截器
根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销
3.功能实现
【1】编写Java代码,自定义拦截器
- 1.定义一个类CustomParameterInterceptor实现
Interceptor
接口。 - 2.在CustomParameterInterceptor类中
定义变量
,这些变量是需要到 Flume的配置文件中进行配置使用的。每一行字段
间的分隔符(fields_separator)
、通过分隔符分隔后,所需要列字段的下标(indexs)
、多个下标使用的分隔符(indexs_separator)
- 3.添加CustomParameterInterceptor的
有参构造方法
。并对相应的变量进行处理。将配置文件中传过来的unicode编码进行转换为字符串
。 - 4.写具体的要处理的逻辑
intercept()
方法,一个是单个处理
的,一个是批量处理
的。 - 5.接口中定义了一个内部接口
Builder
,在configure
方法中,进行一些参数配置
。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。 - 6.定义一个静态类,类中封装MD5加密方法
-
- 通过以上步骤,自定义拦截器的代码开发已完成,然后
打包成jar
, 放到Flume的根目录下的lib
中
- 通过以上步骤,自定义拦截器的代码开发已完成,然后
【2】修改Flume的配置信息????
a1.channels = c1
a1.sources = r1
a1.sinks = s1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://192.168.100.111:8020/flume/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60