Flume介绍,安装及其使用
Flume介绍,安装及其使用
1、概述
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量采集、聚合和传输的软件。
2、组件介绍
Flume中核心的角色agent,agent本身是一个Java进程,一般运行在日志收集节点。
Source:采集源,用于跟数据源对接,以获取数据
Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据
Channel:agent内部的数据传输通道,用于source将数据传递到sink;
在整个数据的传输的过程中,流动的是event,它是Flume内部数据传输的最基本单元。
3、安装
下载地址:flume
解压后,修改conf下的flume-env.sh,并在里面配置JAVA_HOME
4、使用
官方案例:
1) 现在flume的conf目录下新建一个文件
vim netcat-logger.conf
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置sink组件:k1
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 启动agent去采集数据
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf 指定flume自身的配置文件所在目录
-f conf/netcat-logger.con 指定我们所描述的采集方案
-n a1 指定我们这个agent的名字
-Dflume.root.logger=INFO,console 开启flume日志 输出到终端
3)测试
telnet localhost 44444
测试成功!
5、其他source,sink,channel可参考官网
:flume
其他Sources:
其他Sinks:
其他Channels:
6、Flume高阶特性
1)load-balance 负载均衡
负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true #如果开启,则将失败的sink放入黑名单
a1.sinkgroups.g1.processor.selector = round_robin # 另外还支持random
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 #在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长
2)failover 容错
Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。故障转移机制的作用是将失败的Sink降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,在重试之前冷却时间增加。一旦Sink成功发送一个事件,它将恢复到活动池。 Sink具有与之相关的优先级,数量越大,优先级越高。
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5 #优先级值, 绝对值越大表示优先级越高
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 2000 #失败的Sink回切时间(millis)
7、拦截器
拦截器(interceptor)是Flume中简单的插件式组件,设置在source和channel之间。source接收到的event事件,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。
1)timestamp interceptor
flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件header中。如果不使用任何拦截器,flume接受到的只有message。
a1.sources.r1.interceptors = timestamp
a1.sources.r1.interceptors.timestamp.type=timestamp
a1.sources.r1.interceptors.timestamp.preserveExisting=false
2)timestamp + host interceptor
主机拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。时间报头中的key使用hostHeader配置,默认是host。
a1.sources.r1.interceptors = host
a1.sources.r1.interceptors.host.type=host
a1.sources.r1.interceptors.host.useIP=false
a1.sources.r1.interceptors.timestamp.preserveExisting=true
自定义拦截器
根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销。
实现:
1)编写Java代码,定义一个类CustomParameterInterceptor实现Interceptor接口。
2)在CustomParameterInterceptor类中定义变量,这些变量是需要到 Flume的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)、多个下标使用的分隔符(indexs_separator)。
3)添加CustomParameterInterceptor的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的unicode编码进行转换为字符串。
4)写具体的要处理的逻辑intercept()方法,一个是单个处理的,一个是批量处理。
5)接口中定义了一个内部接口Builder,在configure方法中,进行一些参数配置。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。
6)定义一个静态类,类中封装MD5加密方法
7)通过以上步骤,自定义拦截器的代码开发已完成,然后打包成jar, 放到Flume的根目录下的lib中
8)修改Flume配置信息,例:a1.sources.r1.interceptors.i1.type =com.qs.interceptor.CustomParameterInterceptor$Builder
上一篇: 张飞见面就敢和吕布叫嚣单挑 关羽为什么从来不这么做
下一篇: jQuery实现按键盘方向键翻页特效
推荐阅读
-
Python网页解析利器BeautifulSoup安装使用介绍
-
VMware Workstation/Fusion 中安装 Fedora 23/24 及其他 Linux 系统时使用 Open VM Tools 代替 VMware Tools 增强工具的方法
-
Linux环境下安装Nginx及其使用
-
2345安全卫士安装及其功能使用图解
-
zenmate怎么用?zenmate安装使用教程图文详细介绍
-
Apache加速模块mod_pagespeed安装使用详细介绍
-
闪电文字语音转换软件如何安装使用?文字转语音方法介绍
-
Google Chrome(谷歌浏览器)安装方法与使用技巧(图文介绍)
-
CuteFTP怎么用 CuteFTP安装教程及使用指南详细介绍
-
Debug卡原理及其使用样例介绍