欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【Flume】Flume入门解析(二)

程序员文章站 2022-06-14 21:11:20
...

【Flume】Flume入门解析(二)【Flume】Flume入门解析(二)【Flume】Flume入门解析(二)

(图片来源于网络,侵删)


一、案例实战

【1】Flume故障转移(failover)

Failover Sink Processor能够实现failover功能,具体流程类似load balance,但是内部处理机制与load balance完全不同
Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。故障转移机制的作用是将失败的Sink降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,在重试之前冷却时间增加。一旦Sink成功发送一个事件,它将恢复到活动池。 Sink具有与之相关的优先级,数量越大,优先级越高
例如,具有优先级为100的sink在优先级为80的Sink之前被**。如果在发送事件时汇聚失败,则接下来将尝试下一个具有最高优先级的Sink发送事件。如果没有指定优先级,则根据在配置中指定Sink的顺序来确定优先级

整体步骤:

  • 1.根据环境编写 flume配置文件
    节点1如下配置????
    【Flume】Flume入门解析(二)
    上述代码????
#a1 name
a1.channels = c1
a1.sources = r1
a1.sinks = k1 k2
#
##set gruop
a1.sinkgroups = g1
##set sink group
a1.sinkgroups.g1.sinks = k1 k2

#
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/apache-flume-1.8.0-bin/upload

#
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
## set sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node02
a1.sinks.k1.port = 52020
#
## set sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node03
a1.sinks.k2.port = 52020
#
##set failover
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 2
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
#
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

节点2如下配置(节点3和节点2配置一样,就绑定的主机名不一致,修改一下即可)
【Flume】Flume入门解析(二)
上述代码????

#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1

## other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 52020

##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#
##set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:8020/flume/HA
a1.sinks.k1.hdfs.fileType = DataStream

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
  • 2.启动
先在节点2/3根目录下启动:
flume-ng agent -c conf/ -f job/HA-avro-flume-hdfs.conf -n a1

再先在节点1根目录下启动:
flume-ng agent -c conf/ -f job/HA-file-flume-hdfs.conf -n a1
  • 3.将节点2杀死 ,然后往被监控目录添加文件
    【Flume】Flume入门解析(二)

  • 4.发现HDFS上产生了文件,表示节点2被杀死,节点3接替进行工作成功
    【Flume】Flume入门解析(二)

  • 5.再次启动节点2,发现节点2接替节点3,继续接收消息


【2】Flume故障转移(failover)

负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:
【Flume】Flume入门解析(二)

该方式会随机将消息发到不同的Sink端中,实现负载均衡的目的

整体步骤:

  • 1.根据环境编写 flume配置文件
    节点1如下配置????
    【Flume】Flume入门解析(二)
    上述代码????
#a1 name
a1.channels = c1
a1.sources = r1
a1.sinks = k1 k2
#
##set gruop
a1.sinkgroups = g1
##set sink group
a1.sinkgroups.g1.sinks = k1 k2

#
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/apache-flume-1.8.0-bin/upload

#
##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
## set sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node02
a1.sinks.k1.port = 52021
#
## set sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node03
a1.sinks.k2.port = 52021
#
##set failover
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true  #如果开启,则将失败的sink放入黑名单
# 另外还支持random
a1.sinkgroups.g1.processor.selector = round_robin
#在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长  
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000  

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

节点2/3如下配置????
【Flume】Flume入门解析(二)
上述代码????

#set Agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1

## other node,nna to nns
a1.sources.r1.type = avro
a1.sources.r1.bind = node02
a1.sources.r1.port = 52021

##set channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#
##set sink to hdfs
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path= hdfs://node01:8020/flume/load_banlancer
a1.sinks.k1.hdfs.fileType = DataStream

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
  • 2.启动
先在节点2/3根目录下启动:
flume-ng agent -c conf/ -f job/load_banlancer_server.conf -n a1

再先在节点1根目录下启动:
flume-ng agent -c conf/ -f job/load_banlancer_client.conf -n a1
  • 3.往被监控目录添加文件
    【Flume】Flume入门解析(二)
  • 4.查看HDFS上的文件
    【Flume】Flume入门解析(二)

二、过滤器

【1】Flume(interceptor)

整体步骤:

  • 1.根据环境编写 flume配置文件
    节点1/2如下配置????
    【Flume】Flume入门解析(二)
    上述代码????
# Name the components on this agent
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /export/servers/apache-flume-1.8.0-bin/upload/access/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
##  static拦截器的功能就是往采集到的数据的header中插入自己定## 义的key-value对
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /export/servers/apache-flume-1.8.0-bin/upload/nginx/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /export/servers/apache-flume-1.8.0-bin/upload/web/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node03
a1.sinks.k1.port = 41414

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

节点3如下配置????
【Flume】Flume入门解析(二)
上述代码????

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = node03
a1.sources.r1.port =41414

#添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://node01:8020/flume/interceptors/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
#生成的文件按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
#生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize  = 10485760
#批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 10000
#flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
#操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000


#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
说明:sources需要添加过滤器,类型必须是org.apache.flume.interceptor.TimestampInterceptor$Builder
​	  写入数据时获取前面设置的key ,使用%{type}**
  • 2.启动
先在节点3根目录下启动:
flume-ng agent -c conf/ -f job/interceptor2.conf -n a1

再先在节点1/2根目录下启动:
flume-ng agent -c conf/ -f job/interceptor1.conf -n a1
  • 3.往被监控目录添加文件
    【Flume】Flume入门解析(二)
  • 4.查看HDFS上的文件
    【Flume】Flume入门解析(二)
    【Flume】Flume入门解析(二)
    【Flume】Flume入门解析(二)

【2】Flume自定义拦截器

1.介绍

Flume是Cloudera提供的一个高可用的高可靠的分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume有各种自带的拦截器,比如:TimestampInterceptorHostInterceptorRegexExtractorInterceptor等,通过使用不同的拦截器,实现不同的功能

但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的Flume处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理

2.自定义拦截器

根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销

3.功能实现

【1】编写Java代码,自定义拦截器

  • 1.定义一个类CustomParameterInterceptor实现Interceptor接口。
  • 2.在CustomParameterInterceptor类中定义变量,这些变量是需要到 Flume的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)
  • 3.添加CustomParameterInterceptor的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的unicode编码进行转换为字符串
  • 4.写具体的要处理的逻辑intercept()方法,一个是单个处理的,一个是批量处理的。
  • 5.接口中定义了一个内部接口Builder,在configure方法中,进行一些参数配置。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。
  • 6.定义一个静态类,类中封装MD5加密方法
    【Flume】Flume入门解析(二)
    1. 通过以上步骤,自定义拦截器的代码开发已完成,然后打包成jar, 放到Flume的根目录下的lib

【2】修改Flume的配置信息????

a1.channels = c1
a1.sources = r1
a1.sinks = s1

#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000

#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8

a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://192.168.100.111:8020/flume/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60

都看到这里了,点赞评论一下吧!!!

【Flume】Flume入门解析(二)

完结撒花!!!