欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume的进阶介绍和使用(Interceptor,Selectors,Processors)

程序员文章站 2022-03-14 14:13:26
...

Flume的引入

  • 关于Flume的介绍和使用,官网已经给了比较详细的介绍。本文在这里做一个总结。
  • Flume是Apache下的一个开源的*项目,它是一个分布式,可扩展,高可用,高可靠的,轻量级数据收集框架,主要用来做数据的收集,聚合,和传输,相对与传统的数据导入导出框架Sqoop,它具备多方面优势,如,简单易用,Flume只需要一个简单的配置文件即可启动;功能全面,Flume在Source,Channel,Sink三个组件部分提供了很多功能,比如Source端多种类型以支持不同的数据收集需求,并可以自定义Source来满足特定的需求,Source的拦截器(Interceptor),Channel的Selector,Sink端的Processor(下文会一一介绍),并可配置压缩格式,定义文件格式等。

Flume基本概念

  • 关于Flume的基本概念,如,Agent,Source,Channel,Sink可以参照之前博客,该篇文章也有Flume 的基本使用介绍,本篇文章就不做过度阐述,这里主要拎出Flume中重要概念之一,Event。
  • Event: Event是Flume事件处理的最小单元,Flume在读取数据源时,会将一行数据(也就是遇到\r\n)包装成一个Event,它主要有俩个部分,Header和Body, Header主要是以Key,Value的形式用来记录该数据的一些冗余信息,可用来标记数据唯一信息,利用Header的信息,我们可以对数据做出一些额外的操作,如对数据进行一个简单过滤,Body则是存入真正数据的地方。

Flume架构

  • Flume本身的架构特点保证了端到端的数据一致性,数据经过Source被存入到Channel中,如果选择合适的Channel可以保证数据的零丢失,并且Channel 中的数据只有在被Sink 端消费(也就是数据传输到下一个Agent或写入到文件后)才会在Channel中删除相应数据。
  • Flume使用两个独立的事务分别负责从soucrce到channel,以及从channel到sink的事件传递。比如spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到channel且提交成功,那么source就将该文件标记为完成。同理,事务以类似的方式处理从channel到sink的传递过程,如果因为某种 原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到channel中,等待重新传递。
  • Flume 官网介绍了Flume的两种架构选型,扇入和扇出模型,这两种架构需相互结合,并配合上Flume本身提供Load_balance,Failover 机制来使整体设计的架构稳定,可靠

Flume的进阶介绍和使用(Interceptor,Selectors,Processors)
Flume的进阶介绍和使用(Interceptor,Selectors,Processors)

Flume Interceptor(拦截器)

  • Flume 的拦截器,直接从字面上理解的一种作用是拦截过滤指定的数据内容,做一个简单的清洗;还有一种作用是在Source写入到Channel中时,在Event的Header中添加一些有用的信息,比如添加Timestamp,下面介绍一些Flume中常见的Interceptor。
  • Timestamp Interceptor:
    时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多。比如在使用HDFS Sink时候,根据events的时间戳生成结果文件,hdfs.path = hdfs://var/tmp/event/%Y%m%d

    hdfs.filePrefix = log_%Y%m%d_%H

    会根据时间戳将数据写入相应的文件中。

    但可以用其他方式代替(设置useLocalTimeStamp = true)。

    • 参数解释

      Property Name               Default          Description
      type                          –               指定为:timestamp
      header                     timestamp          header中的key的值
      preserveExisting             false            如果timestamp存在,是否保存
      
  • Host Interceptor:
    主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)。

    • 参数解释

      Property Name                Default          Description
      type                          –               指定为:host
      preserveExisting              false           如果host存在,是否保存
      useIP                         true            使用IP,否则使用hostname
      hostHeader                    host            header中的key的值
      
  • Static Interceptor:
    静态拦截器,用于在events header中加入一组静态的key和value。

    • 参数解释

      Property Name                 Default         Description
      type                           –              指定为:static
      preserveExisting               true           如果定义的key存在,是否保存
      key                            key            header中的key的值
      value                          value          header中的value的值    
      
  • UUID Interceptor:
    UUID拦截器,用于在每个events header中生成一个UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中读取并使用。

    • 参数解释

      Property Name                 Default          Description
      type                             –             指定为:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
      headerName                       id            header中的key的值
      preserveExisting                true           如果UUID存在,是否保存
      prefix                           “”            UUID前缀
      
  • Regex Filtering Interceptor:
    通过正则来清洗数据。

    • 参数解释

      Property Name                  Default          Description
      type                              –             指定为:regex_filter
      regex                            ”.*”           与事件匹配的正则表达式
      excludeEvents                   false           按正则排除或匹配
      
  • Regex Extractor Interceptor:
    通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

    • 参数解释

      Property Name                   Default          Description
      type                                –            指定为:regex_extractor
      regex                               –            与事件匹配的正则表达式
      serializers                         –            序列化value的方式,flume提供俩种内置方式:org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer ,org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
      serializers.<s1>.type            default         默认为default(org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, 或者自定义class实现 org.apache.flume.interceptor.RegexExtractorInterceptorSerializer
      serializers.<s1>.name               –            key的名称
      serializers.*                       –            Serializer-specific properties
      

Flume Channel Selectors(选择器)

  • Flume Channel Selectors 官方提供了两种选择,Replicating(复制)和Multiplexing(复用),当然,我们也可自定义Selectors,Replicating(复制):同一个Event会发送给每一个Channel 中;Multiplexing(复用):同一个Event只会选择多个Channel中的一个发送,默认使用Replicating Selector
  • Replicating Channel Selector (default):

    • 参数解释

      Property Name                    Default         Description
      selector.type                    replicating     可指定类型:replicating
      selector.optional                   –            标记可选项的Channel名称
      

      注:当一个Channel被标记为可选项时,表示对该Channel写入失败时会被简单的忽略,而没被标记的Channel在写入失败时会导致Event传输停止

  • Multiplexing Channel Selector:

    • 参数解释

      Property Name                   Default           Description
      selector.type                  replicating        指定为:multiplexing
      selector.header              flume.selector.header   header中key的名称
      selector.default                     –            指定默认的Event传输的Channel
      selector.mapping.*                   –            传输到指定Channel的value属性
      

Flume Sink Processors(处理器)

  • Flume官网提供了三种Processors可供选择,用户自定义Processor暂时还没被支持,使用Flume Sink Processors我们可以做负载均衡(load balancing)和fail over切换。
  • Default Sink Processor:

    • 参数解释

      Property Name                  Default          Description
      sinks                             –             指定sink组的list,空格分隔
      processor.type                 default          processor类型default, failover or load_balance
      

      默认Sink Processor不会强制用户去创建sink group,可以采用最简单的source–channel–sink的单sink。

  • Failover Sink Processor:

  • FailoverSink Processor会通过配置维护了一个优先级列表。保证每一个有效的事件都会被处理。故障转移的工作原理是将连续失败sink分配到一个池中,在那里被分配一个冷冻期,在这个冷冻期里,这个sink不会做任何事。一旦sink成功发送一个event,sink将被还原到live 池中。在这配置中,要设置sinkgroups processor为failover,需要为所有的sink分配优先级,所有的优先级数字必须是唯一的,这个得格外注意。此外,failover time的上限可以通过maxpenalty 属性来进行设置。

    • 参数解释

      Property Name                   Default         Description
      sinks                              –            指定sink组中sink的名称,空格分隔
      processor.type                  default         指定类型为:failover
      processor.priority.<sinkName>      –            指定sink组中sink的优先权,用数字表示,数字越大级别越高,且是唯一值
      processor.maxpenalty             30000          故障sink的最大冷却周期
      
  • Load balancing Sink Processor:

  • 负载均衡片处理器提供在多个Sink之间负载平衡的能力。实现支持通过round_robin(轮询)或者random(随机)参数来实现负载分发,默认情况下使用round_robin,但可以通过配置覆盖这个默认值。还可以通过集成AbstractSinkSelector类来实现用户自己的选择机制。

    当被调用的时候,这选择器通过配置的选择规则选择下一个sink来调用。

    • 参数解释

      Property Name                   Default          Description
      processor.sinks                   –              指定sink组中sink的名称,空格分隔
      processor.type                  default          指定类型为:load_balance
      processor.backoff                false           Should failed sinks be backed off exponentially.
      processor.selector            round_robin        指定负载机制
      processor.selector.maxTimeOut    30000           Used by backoff selectors to limit exponential backoff (in milliseconds)
      
相关标签: flume