欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flume自定义AsynHBaseSink

程序员文章站 2022-06-14 13:04:08
...

在进行项目学习的时候,有个需求是将日志数据经过flume收集,然后sink到HBase中。经过查看官方文档,发现flume内置支持两种HBasesink。如下图所示:

Flume自定义AsynHBaseSink

其中HBaseSink和HBase2Sink几乎完全一样,只是针对的版本不同而已。AsyncHBaseSink使用了 Asynchbase API来写HBase,这个API具有完全异步的,非阻塞的,线程安全的,高性能的特性。同时,万一有些events写入HBase失败,那么此sink会replay那个事务中的所有events。

一、flume内置的AsyncHBaseSink分析

在flume官网上下载一下src包,并导入IDEA中。我的版本是1.7.0。目录结构如下:

Flume自定义AsynHBaseSink

我们要关注的就是箭头指向的类。点进这个文件中。

1.1 AsyncHBaseSink类字段

首先看一下这个类的一些字段,我的截图不全,因为没必要完全把每个字段都弄明白。That is to say ,可以但没必要=!=

Flume自定义AsynHBaseSink

我们可以看到这个类继承了 AbstractSink这个抽象类,并实现了 Configurable接口。还可以看到一些顾名思义的字段,比如:要写入的HBase表名,列族等等。

现在我们要关注的是这三个字段:

  private AsyncHbaseEventSerializer serializer;   //一会讲
  private String eventSerializerType;   //一会讲
  private Context serializerContext;   //一会讲

首先先给出这三个字段是做什么的。

①serializer:它的类型是 AsyncHbaseEventSerializer接口,这个接口定义了如下方法:

Flume自定义AsynHBaseSink

这个接口的作用就是让子类自己*实现封装向HBase发送数据的请求。也就是 getActions()方法。然后在AsyncHBaseSink的process方法中调用serializer.getActions()就可以得到这些请求,然后写到HBase中。

②eventSerializerType:这个就是用来从flume的配置文件中读取serializer参数。以确定使用哪个AsyncHbaseEventSerializer具体的实现类

Flume自定义AsynHBaseSink

可以看到如果不配置的话,默认就是使用这个 org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer类。

Flume自定义AsynHBaseSink

然后通过反射,把serializer实例化成eventSerializerType所表示的类。如上图所示。在上图中,我还把我们要介绍的第三个字段 serializerContext也圈起来了。

③serializerContext:这个字段是Context类型。在flume中,Context内部存了一个HashMap,保存的都是键对。所以我们可以把Context简单的理解为存储键值信息的一个类。为什么叫serializerContext呢,因为通过反射实例化的serializer对象也需要从配置文件中获取一些配置信息。所以这个对象相当于就是跟serializer捆绑在一起的。通过上图的putAll函数把配置信息存进来。其中用到了context的getSubProperties()方法。这个方法我举个例子说明。

比如flume的配置文件中有如下配置:

agent1.sinks.hbaseSink.serializer =ClassA
agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid

那么 getSubProperties("serializer")的结果就是<"payloadColumn", "datatime,userid">

到了这,我们最关注的的三个字段讲完了。

1.2. AsyncHBaseSink的 process()过程

这个方法就是真正向HBase中插入数据的方法:

Flume自定义AsynHBaseSink

首先看上图第一个红框,flume收到的event传给serializer去序列化,然后再通过getActions获得最终要传输的封装好的数据。和我们上一节介绍字段时描述的一样。

第二个红框就是把一个一个的封装好的数据PutRequest对象写入到HBase了。

1.3 AsyncHbaseEventSerializer的默认实现SimpleAsyncHbaseEventSerializer

上一节中我们说过,如果不在配置文件中指定serializer,那么默认就由SimpleAsyncHbaseEventSerializer这个实现类来处理event的序列化。我们最终要自定义的Sink,也是仿照这个类去写的。所以我们这节看一下这个类。

Flume自定义AsynHBaseSink

这是这个类的一些字段。看到有payload和payloadColumn,这是干什么的呢?不知道没关系,我先告诉大家,下面还会有分析。payload就是列的值,payloadColumn就是列名。

为什么这么说呢?因为在构造向HBase中写数据的请求时,new了一个PutRequest的实例,并把payload和payloadColumn当做参数传入。我们直接看PutRequest类的构造方法参数是什么意思就知道了。如下:

Flume自定义AsynHBaseSink

豁然开朗。

二、实现符合自己需求的自定义AsynHBaseSink

2.1 为什么要进行二次开发?

首先明确一个问题,为什么我们要自定义AsynHBaseSink,而不是直接使用flume自带的。肯定是需求不满足我们的要求啊。怎么个不满足法儿呢?且听我娓娓道来。

Flume自定义AsynHBaseSink

在SimpleAsyncHbaseEventSerializer类的configure方法中,会为rowPrefix成员变量赋一个默认值(当我们没有在配置文件中配置rowPrefix的时候)。默认是字符串"default"。

然后在构造PutRequest的时候,会使用这个rowPrefix去生成rowkey。

Flume自定义AsynHBaseSink

getTimestampKey为例。返回的是 rowPrefix + 当前时间戳。

Flume自定义AsynHBaseSink

在实际生产中,rowkey的设计好坏可以极大的影响HBase的查询性能。所以,我们一般都是自己设计rowkey,而不会使用flume自带的这个simple,simple,simple的 SimpleRowKeyGenerator。所以这才是我们需要二次开发的理由。

2.2 如何二次开发?

①:写一个符合我们自己的rowkey生成函数。

Flume自定义AsynHBaseSink

②:getActions方法中,根据我们自己的需求去构造PutRequest。Flume自定义AsynHBaseSink

2.3 打成jar包放到flume的lib文件夹下并测试程序。

Flume自定义AsynHBaseSink

Flume自定义AsynHBaseSink

启动flume成功Flume自定义AsynHBaseSink

相关标签: flume