欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume介绍,安装及其使用

程序员文章站 2022-03-08 14:25:10
...

Flume介绍,安装及其使用

1、概述

	Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量采集、聚合和传输的软件。

2、组件介绍

Flume介绍,安装及其使用
Flume中核心的角色agent,agent本身是一个Java进程,一般运行在日志收集节点。
Source:采集源,用于跟数据源对接,以获取数据
Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据
Channel:agent内部的数据传输通道,用于source将数据传递到sink;
在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。

3、安装

下载地址:flume
解压后,修改conf下的flume-env.sh,并在里面配置JAVA_HOME
Flume介绍,安装及其使用

4、使用

官方案例:
1) 现在flume的conf目录下新建一个文件
vim netcat-logger.conf

# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 描述和配置sink组件:k1
a1.sinks.k1.type = logger

# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2) 启动agent去采集数据

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -Dflume.root.logger=INFO,console

-c conf 指定flume自身的配置文件所在目录
-f conf/netcat-logger.con 指定我们所描述的采集方案
-n a1 指定我们这个agent的名字
-Dflume.root.logger=INFO,console 开启flume日志 输出到终端

3)测试
telnet localhost 44444
Flume介绍,安装及其使用
Flume介绍,安装及其使用
测试成功!

5、其他source,sink,channel可参考官网

flume
其他Sources
Flume介绍,安装及其使用
其他Sinks
Flume介绍,安装及其使用
其他Channels
Flume介绍,安装及其使用

6、Flume高阶特性

1)load-balance 负载均衡
负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true  #如果开启,则将失败的sink放入黑名单
a1.sinkgroups.g1.processor.selector = round_robin  # 另外还支持random
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 #在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长

2)failover 容错
Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。故障转移机制的作用是将失败的Sink降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,在重试之前冷却时间增加。一旦Sink成功发送一个事件,它将恢复到活动池。 Sink具有与之相关的优先级,数量越大,优先级越高。

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5  #优先级值, 绝对值越大表示优先级越高
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 2000  #失败的Sink回切时间(millis)

7、拦截器

拦截器(interceptor)是Flume中简单的插件式组件,设置在source和channel之间。source接收到的event事件,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。

1)timestamp interceptor
flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件header中。如果不使用任何拦截器,flume接受到的只有message。

a1.sources.r1.interceptors = timestamp 
a1.sources.r1.interceptors.timestamp.type=timestamp 
a1.sources.r1.interceptors.timestamp.preserveExisting=false

2)timestamp + host interceptor

主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。时间报头中的key使用hostHeader配置,默认是host。

a1.sources.r1.interceptors = host 
a1.sources.r1.interceptors.host.type=host 
a1.sources.r1.interceptors.host.useIP=false 
a1.sources.r1.interceptors.timestamp.preserveExisting=true

自定义拦截器

根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销。
实现
1)编写Java代码,定义一个类CustomParameterInterceptor实现Interceptor接口。
2)在CustomParameterInterceptor类中定义变量,这些变量是需要到 Flume的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)、多个下标使用的分隔符(indexs_separator)。
3)添加CustomParameterInterceptor的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的unicode编码进行转换为字符串。
4)写具体的要处理的逻辑intercept()方法,一个是单个处理的,一个是批量处理。
5)接口中定义了一个内部接口Builder,在configure方法中,进行一些参数配置。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。
6)定义一个静态类,类中封装MD5加密方法
7)通过以上步骤,自定义拦截器的代码开发已完成,然后打包成jar, 放到Flume的根目录下的lib中
8)修改Flume配置信息,例:a1.sources.r1.interceptors.i1.type =com.qs.interceptor.CustomParameterInterceptor$Builder

相关标签: 大数据