Flume学习笔记
Flume学习笔记
1.Flume概述
Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中。轻量,配置简单,适用于各种日志收集,并支持Failover和负载均衡。并且它拥有非常丰富的组件。Flume NG采用的是三层架构:Agent层,Collector层和Store层,每一层均可水平拓展。其中Agent包含Source,Channel和Sink,三者组建了一个Agent。三者的职责如下所示:
- Source:用来消费(收集)数据源到Channel组件中
- Channel:中转临时存储,保存所有Source组件信息
- Sink:从Channel中读取,读取成功后会删除Channel中的信息
下图是Flume NG的架构图:
2.Flume介绍
Flume用户手册:http://flume.apache.org/FlumeUserGuide.html
Flume开发者手册:http://flume.apache.org/FlumeDeveloperGuide.html
2.1.Flume采集系统结构图
- 简单结构:单个agent采集数据
- 复杂结构:多级agent之间串联
- 高可用Flume NG集群 :
Flume的存储可以支持多种,这里只列举了HDFS和Kafka(如:存储最新的一周日志,并给Storm系统提供实时日志流)
2.2.Flume Sources
参考:
https://www.cnblogs.com/swordfall/p/8254271.html
http://flume.apache.org/FlumeUserGuide.html
2.2.1.Flume Sources 类型
官网文档上要求的属性是粗体字。
-
1.Avro Source:
监听Avro端口,从Avro client streams接收events。
-
2.Thrift Source:
监听Thrift端口和从外部Thrift client streams接收events。
-
3.Exec Source:
Exec Source在启动时运行一个Unix命令行,并期望这过程在标准输出上连续生产数据。
-
4.JMS Source:
JMS Source从JMS目标(如队列或者主题)读取消息。JMS应用程序应该可以与任何JMS提供程序一起工作,但是只能使用ActiveMQ进行测试。
-
5.Spooling Directory Source:
该source让你通过放置被提取文件在磁盘”spooling“目录下这一方式,提取数据。该source将会监控指定目录的新增文件,当新文件出现时解析event。event解析逻辑是可插入的。当一个给定文件被全部读取进channel之后,它被重命名,以标识为已完成(或者可选择deleted)。
-
6.Taildir Source:
注意:该source不能用于windows。
7.Twitter 1% firehose Source(试验):
-
8.Kafka Source:
Kafka Source是Apache Kafka消费者,从Kfaka topics读取消息。如果你有多个Kafka source在跑,你可以配置它们在相同的Consumer Group,以使它们每个读取topics独特的分区。
-
9.NetCat TCP Source:
netcat source监听一个给定的端口,然后把text文件的每一行转换成一个event。要求属性是粗体字。
-
10.NetCat UDP Source:
netcat source监听一个给定的端口,然后把text文件的每一行转换成一个event。
-
11.Sequence Generator Source:
一个简单的序列生成器可以不断生成events,带有counter计数器,从0开始,以1递增,在totalEvents停止。当不能发送events到channels时会不断尝试。
-
12.Syslog Sources:
读取系统日志,并生成Flume events。UDP source以整条消息作为一个简单event。TCP source以新一行”n“分割的字符串作为一个新的event。
-
Syslog TCP Source :
原始的,可靠的Syslog TCP source。
-
Multiport Syslog TCP Source:
这是一个新的,更快的,多端口的Syslog TCP source版本。注意ports配置替代port。
- Syslog UDP Source:
-
Syslog TCP Source :
-
13.HTTP Source:
source 通过HTTP POST 和 GET,接收Flume events。GET只能用于试验。HTTP requests通过 必须实现 HTTPSourceHandler接口的 ”handler“ 转换成flume events。该handler获取HttpServletRequest,然后返回一系列的flume events。
-
14.Stress Source:
StressSource 是内部负载生成source的实现,这对于压力测试是非常有用的。它允许用户配置Event有效载荷的大小。
-
15.Legacy Sources:
Legacy sources允许Flume 1.x agent接收来自Flume 0.9.4 agents的events。legacy source 支持Avro和Thrift RPC 连接。为了使用两个Flume 版本搭建的桥梁,你需要开始一个带有avroLegacy或者thriftLegacy source的Flume 1.x agent。0.9.4agent应该有agent Sink指向1.x agent的host/port。
- Avro Legacy Source
- Thrift Legacy Source
-
16.Custom Source(自定义Source):
自定义Source是你实现Source接口。当启动Flume agent时,一个自定义source类和它依赖项必须在agent的classpath中。
-
17.Scrible Source:
Scribe是另一种类型的提取系统。采用现有的Scribe提取系统,Flume应该使用基于Thrift的兼容传输协议的ScribeSource。
2.3.Flume Sinks
参考:
http://www.cnblogs.com/swordfall/p/8157766.html
http://flume.apache.org/FlumeUserGuide.html
2.3.1.Flume Sinks 类型
-
HDFS Sink:
该sink把events写进Hadoop分布式文件系统(HDFS)。它目前支持创建文本和序列文件。它支持在两种文件类型压缩。文件可以基于数据的经过时间或者大小或者事件的数量周期性地滚动。它还通过属性(如时间戳或发生事件的机器)把数据划分为桶或区。
-
Hive Sink:
该sink streams 将包含分割文本或者JSON数据的events直接传送到Hive表或分区中。使用Hive 事务写events。当一系列events提交到Hive时,它们马上可以被Hive查询到。
-
Logger Sink:
Logs event 在INFO 水平。典型用法是测试或者调试。
-
Avro Sink:
Flume events发送到sink,转换为Avro events,并发送到配置好的hostname/port。从配置好的channel按照配置好的批量大小批量获取events。
-
Thrift Sink:
Flume events发送到sink,转换为Thrift events,并发送到配置好的hostname/port。从配置好的channel按照配置好的批量大小批量获取events。
-
IRC Sink:
IRC sink从链接的channel获取消息和推送消息到配置的IRC目的地。
-
File Roll Sink:
在本地文件系统存储events。
-
Null Sink:
当接收到channel时丢弃所有events。
-
HBaseSinks:
该sink写数据到HBase。
-
AsyncHBaseSink:
该sink采用异步模式写数据到HBase。
-
MorphlineSolrSink:
该sink从Flume events提取数据并转换,在Apache Solr 服务端实时加载,Apache Solr servers为最终用户或者搜索应用程序提供查询服务。
-
ElasticSearchSink:
该sink写数据到elasticsearch集群。
-
Kite Dataset Sink:
试验sink写event到Kite Dataset。
-
Kafka Sink:
Flume Sink实现可以导出数据到一个Kafka topic。
-
HTTP Sink:
该sink将会从channel获取events,并使用HTTP POST请求发送这些events到远程服务。event 内容作为POST body发送。
-
Custom Sink:
自定义sink是你实现Sink接口。当启动Flume agent时,一个自定义sink类和它依赖项必须在agent的classpath中。
2.3.2.Flume Sink Processors
Sinks groups 允许用户把多个sinks分组汇入到一个实体中。Sink processors可以用于在组内所有sinks提供负载平衡,或者在暂时失败的情况下实现从一个sink到另一个sink的故障转移。
-
Default Sink Processor
默认sink processor只接收一个简单sink。用户没有强制去为单个sinks创建processor(sink group)。相反,用户可以按照用户指南上解释的source - channel - sink 模式。
-
Failover Sink Processor
Failover Sink Processor 维护sinks的优先列表,保证当有可用的events将会被处理。
-
Load balancing Sink Processor
Load balancing sink processor 提供了对多个sinks进行负载平衡的能力。
2.3.3.Event Serializers
file_roll sink和hdfs sink都支持EventSerializer接口。下面提供了Flume附带的EventSerializers的细节。
-
Body Text Serializer
别名:text。拦截器将event的主体写入输出流,而没进行任何的转换或者修改。event header被忽略。配置选项:
-
“Flume Event” Avro Event Serializer
别名:avro_event。
拦截器将Flume events序列化成一个Avro容器文件。所使用的模式与Avro RPC机制中用于Flume events的模式相同。
该serializer继承自AbstractAvroEventSerializer类。
-
Avro Event Serializer
别名:该serializer没有别名,必须指定使用的类名。
2.4. Flume Channel
参考:
http://www.cnblogs.com/swordfall/p/8169554.html
http://flume.apache.org/FlumeUserGuide.html
2.4.1. Flume Channel 类型
-
Memory Channel(内存Channels)
events存储在配置最大大小的内存队列中。对于流量较高和由于agent故障而准备丢失数据的流程来说,这是一个理想的选择。
-
JDBC Channel
events存储在持久化存储库中(其背后是一个数据库)。JDBC channel目前支持嵌入式Derby。这是一个持续的channel,对于可恢复性非常重要的流程来说是理想的选择。
-
Kafka Channel
events存储在Kafka集群中。Kafka提供高可用性和高可靠性,所以当agent或者kafka broker 崩溃时,events能马上被其他sinks可用。
Kafka channel可以被多个场景使用:
- Flume source和sink - 它为events提供可靠和高可用的channel
- Flume source和interceptor,但是没sink - 它允许写Flume evnets到Kafka topic
- Flume sink,但是没source - 这是一种低延迟,容错的方式从Kafka发送events到Flume sinks 例如 HDFS, HBase或者Solr
File Channel
-
Spillable Memory Channel
events存储在内存队列和磁盘中。该channel目前正在试验中,不要求在生产环境中使用。
-
Pseudo Transaction Channel
注意:Pseudo Transaction Channel只用于单元测试,不用于生产环境使用。
-
Custom Channel
自定义channel是你实现Channel接口。当Flume agent启动时,一个自定义channel类和它依赖项必须包含在agent的classpath。
2.4.2.Flume Channel Selectors
如果类型没有指定,那么默认“replicating”。
Replicating Channel Selector(default) (复制channel选择器)
Multiplexing Channel Selector (多路复用Channel选择器)
-
Custom Channel Selector (自定义Channel选择器)
一个自定义channel选择器(selector)是实现ChannelSelector的接口。当Flume agent启动时,一个自定义channel selector类和它依赖项必须包含在agent的classpath。
2.5.Flume Interceptors(拦截器)
参考:
http://www.cnblogs.com/swordfall/p/8207880.html
http://flume.apache.org/FlumeUserGuide.html
Flume有能力修改/删除流程中的events。这是在拦截器(interceptor)的帮助下完成的。拦截器(Interceptors)是实现org.apache.flume.interceptor.Interceptor接口的类。一个interceptor可以根据interceptor的开发者选择的任何标准来修改,甚至放弃events。这个可以通过在配置中指定一系列interceptor生成类名来实现。Interceptors在source配置中被指定作为空白分隔符列表。如果interceptor需要放弃events,它不会在它需要返回的列表中返回该events。如果interceptor放弃全部events,然后它返回一个空列表。简单示例:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1
注意:该interceptor构建是被传递给type配置属性。interceptors本身是可配置的,并且可以像传递给其他可配置组件一样传递配置值。在上述示例中,events先传递到HostInterceptor,并且events被HostInterceptor返回,然后独自传递到TimestampInterceptor。你可以指定完全限定的类名称或者别名 timestamp。如果你有多个收集器写到同一个HDFS路径,然后你也可以使用HostInterceptor。
2.5.1.Flume Interceptors 种类
-
Timestamp Interceptor
该interceptor向event headers插入秒级时间,当event被处理时。该interceptor插入一个带有关键timestamp(或者由header属性指定)的header,其值是相关的timestamp。该interceptors可以保留一个已存在timestamp,如果它已经在配置中预先配置。
-
Host Interceptor
该interceptor插入运行agent的host的hostname或者IP地址。它根据配置插入带有**host或配置**(其值为host的hostname或IP地址)的header。
-
Static Interceptor
静态interceptor运行用户给所有events添加一个带有静态值的静态header。
-
Remove Header Interceptor
该interceptor通过移除一个或多个headers来操作Flume event headers。它可以移除一个静态定义的header,基于规则表达式的headers或者在一个列表中的headers。如果这些没有定义,或者如果没有header匹配到标准,Flume events将不会修改。
注意:如果只有一个header需要移除,通过名字指定它可以提供比其他两种方法更好的性能。
-
UUID Interceptor
该interceptor在被拦截的所有事件上设置一个通用唯一的标识符。
-
Morphline Interceptor
该interceptor通过morphline配置文件过滤events,该配置文件定义了一条从一个命令到另一个命令管道记录的转换命令链。例如,morphline可以忽略某些events,或者通过基于正则表达式的模式匹配来改变或者插入某些event headers,或者它可以通过Apache Tika自动检测和设置一个MIME类型在被拦截的events上。
-
Search and Replace Interceptor
该interceptor提供了基于Java正则表达式的简单的基于字符串的search-and-replace功能。回溯/组捕获也是可用的。这个interceptor使用与Java Matcher.replaceAll()方法相同的规则。
-
Regex Flitering Interceptor
该拦截器通过将event正文解释为文本并将文本与配置的正则表达式进行匹配来选择性地过滤events。
-
Regex Extractor Interceptor
此interceptor使用指定的正则表达式提取正则表达式匹配组,并将匹配组附加为event的headers。
该serializers用于将匹配映射到header名称和格式化的header值;默认的,你只需要指定header名称和默认org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer将会被使用。这个serializer只是将匹配映射到指定的header名称,并传递通过由正则表达式提取的值。
3.Flume安装
3.1.JDK下载安装
- 下载对应版本JavaSDK,下载地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html
- 解压jdk到/usr/local目录下
- 配置环境变量
vim /etc/profile
# 在最后面添加如下几行
# for jdk
export JAVA_HOME=/usr/local/jdk1.8.0_171
export PATH=$PATH:$JAVA_HOME/bin
# 使环境变量生效
source /etc/profile
3.2.Flume下载安装
- 下载二进制包,下载地址:http://flume.apache.org/download.html
- 下载校验文件,文件名*.asc, 下载地址:http://flume.apache.org/download.html
- 下载KEYS,地址:http://www.apache.org/dist/flume/KEYS
- 校验下载的二进制包
gpg --import KEYS
gpg --verify apache-flume-1.8.0-bin.tar.gz.asc
gpg: Signature made Fri 15 Sep 2017 09:04:39 PM CST using RSA key ID 4199ACFF
gpg: Good signature from "Denes Arvay <aaa@qq.com>"
gpg: WARNING: This key is not certified with a trusted signature!
gpg: There is no indication that the signature belongs to the owner.
Primary key fingerprint: 4CF1 5CF1 525F CE29 F66C 08FA 302C B2A8 4199 ACFF
- 解压二进制包到/usr/local目录下,目录结构如下
├── bin
├── conf
├── lib
├── logs
└── tools
- 配置Flume
cp conf/flume-env.sh.template conf/flume-env.sh
vim conf/flume-env.sh
# 打开这两行注释,改成实际需要的参数
export JAVA_HOME=/usr/local/jdk1.8.0_171
export JAVA_OPTS="-Xms1024m -Xmx1024m -Dcom.sun.management.jmxremote"
4.高可用Flume NG集群搭建 :
参考:
https://www.cnblogs.com/smartloli/p/4468708.html
4.1节点分配
名称 | Host | 角色 |
---|---|---|
Agent1 | epp-nginx-001v | Web Server |
Agent2 | epp-nginx-002v | Web Server |
Collector1 | bi-slave1 | AgentMstr1 |
Collector2 | bi-slave2 | AgentMstr2 |
Agent1,Agent2数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,可以自动切换和恢复。要把所有的日志都收集到一个集群中存储。
4.2.配置
- Agent1和Agent2的配置相同:
flume-client.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /data/logs/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /usr/local/nginx/logs/ecloud.belle.cn.access.log
a1.sources.r1.fileHeader = true
a1.sinks.k1.channel = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bi-slave1
a1.sinks.k1.port = 10000
a1.sinks.k1.batch-size = 1000
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bi-slave2
a1.sinks.k2.port = 10000
a1.sinks.k2.batch-size = 1000
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.channels.c1.type = memory
a1.channels.c1.capacity = 5000
a1.channels.c1.transactionCapacity = 5000
- Collector1和Collector2下沉到使用kerboros安全认证的CDH集群HDFS中配置
bi-slave1: nginxlog.sources.r1.bind = bi-slave1
bi-slave1: nginxlog.sources.r1.bind = bi-slave2
nginxlog.sinks = k1
nginxlog.channels = c1
nginxlog.sources.r1.channels = c1
nginxlog.sources.r1.type = avro
nginxlog.sources.r1.channels = c1
nginxlog.sources.r1.bind = bi-slave1
nginxlog.sources.r1.port = 10000
nginxlog.sinks.k1.channel = c1
nginxlog.sinks.k1.type = hdfs
nginxlog.sinks.k1.hdfs.kerberosPrincipal = hdfs/bi-slave1
nginxlog.sinks.k1.hdfs.kerberosKeytab = /usr/flume-keytab/hdfs.keytab
nginxlog.sinks.k1.hdfs.path = /flume/events/%y-%m-%d
nginxlog.sinks.k1.hdfs.filePrefix = events
nginxlog.sinks.k1.hdfs.round = true
nginxlog.sinks.k1.hdfs.roundValue = 24
nginxlog.sinks.k1.hdfs.roundUnit = hour
nginxlog.sinks.k1.hdfs.rollInterval = 600
nginxlog.sinks.k1.hdfs.rollSize = 268435456
nginxlog.sinks.k1.hdfs.rollCount = 2000
nginxlog.sinks.k1.hdfs.batchSize = 1000
nginxlog.sinks.k1.hdfs.useLocalTimeStamp = true
nginxlog.sinks.k1.hdfs.fileType = DataStream
nginxlog.channels.c1.type = memory
nginxlog.channels.c1.capacity = 5000
nginxlog.channels.c1.transactionCapacity = 5000
- Collector1和Collector2下沉到使用kerboros安全认证的CDH集群kafka中配置
nginxlog.sources = r1
nginxlog.sinks = k1
nginxlog.channels = c1
nginxlog.sources.r1.channels = c1
nginxlog.sources.r1.type = avro
nginxlog.sources.r1.channels = c1
nginxlog.sources.r1.bind = bi-slave1
nginxlog.sources.r1.port = 10000
nginxlog.sinks.k1.channel = c1
nginxlog.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
nginxlog.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
nginxlog.sinks.k1.kafka.producer.sasl.kerberos.service.name = kafka
nginxlog.sinks.k1.kafka.producer.group.id = flume-producer
nginxlog.sinks.k1.kafka.topic = nginxlog
nginxlog.sinks.k1.kafka.bootstrap.servers = 172.17.194.17:9092,172.17.194.18:9092,172.17.194.20:9092
nginxlog.sinks.k1.kafka.flumeBatchSize = 1000
nginxlog.channels.c1.type = memory
nginxlog.channels.c1.capacity = 5000
nginxlog.channels.c1.transactionCapacity = 5000
4.3.启动
- 在Agent节点上启动命令如下所示:
bin/flume-ng agent -c conf -f conf/flume-client.conf -n a1 -Dflume.root.logger=INFO,console >> /data/logs/flume/flume_client.log 2>&1 &
- CDH集群上修改bi-slave1和bi-slave2的配置后重启两个节点
4.4.Failover测试
在CDH集群中停止bi-slave1的Flume NG服务,查看bi-slave1、bi-slave2的日志,并检查数据的完整性,结果:bi-slave1服务结束后,flume采集的下沉任务到bi-slave2上了,且数据衔接上了。
bi-slave1恢复服务后,flume采集的下沉任务又回到了bi-slave1上了。