欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Flume实现写入es

程序员文章站 2022-05-18 19:06:57
Flume定制elasticsearch sink源码 最近尝试通过Flume将消息写入elasticsearch,但是flume并没有对每个es版本提供支持,仅仅保留了对0.9版本支持,可能是由于es版本变化频繁且不同版本间差异较大,没有办法在每个Flume版本都对es每个版本进行一次定制开发。 ......

flume定制elasticsearch sink源码

最近尝试通过flume将消息写入elasticsearch,但是flume并没有对每个es版本提供支持,仅仅保留了对0.9版本支持,可能是由于es版本变化频繁且不同版本间差异较大,没有办法在每个flume版本都对es每个版本进行一次定制开发。

版本兼容问题

下面是我如何在flume1.7版本实现对es6.8写入,实现期间趟了无数的坑,其中一个下插曲是,自己大意,从官网下载了最新的flume源码(1.9),因为es sink部分代码变化极少,以因此窃以为使用最新源码开发只编译es sink包是没有问题的,开发完了才发现,打出的sink包无法在1.7上运行,重下1.7版本flume源码再做调整。。。ε=(´ο`*)))唉。

 

flume源码下载

flume是apache的*开源项目,直接到apache官网下载,源码下下来后使用ide打开,我用的是idea。flume有两个发版的代码线,0.9.x和1.x,这里要注意,下载的flume源码版本要和自己使用的flume版本一致。flume项目依赖的包非常多,并且开源项目都使用的是maven中心仓库里的官方包,因此第一次导入flume项目是个很漫长的过程,保持网络畅通哦,我导入大概花了3个小时把所有包下下来。

 

代码修改

flume源码中es sink相关代码都在flume/flume-ng-sinks/flume-ng-elasticsearch-sink子模块下,代码实现很简单。

apache-flume-1.7.0-src

|—flume-ng-elasticsearch-sink

|—client

         |—elasticsearchclient.java

  |—elasticsearchclientfactory.java

  |—elasticsearchrestclient.java

  |—elasticsearchtransportclient.java

  |—nosuchclienttypeexception.java

  |—roundrobinlist.java

|—abstractelasticsearchindexrequestbuilderfactory.java

|—contentbuilderutil.java

|—elasticsearchdynamicserializer.java

|—elasticsearchindexrequestbuilderfactory.java

|—elasticsearchlogstasheventserializer.java

|—elasticsearchsink.java

|—elasticsearchsinkconstants.java

|—eventserializerindexrequestbuilderfactory.java

|—indexnamebuilder.java

|—simpleindexnamebuilder.java

|—timebasedindexnamebuilder.java

|—timestampedevent.java

|—pom.xml

|—pom.xml

 1. 修改pom.xml 中es相关包依赖版本为6.8.5

 2. 调整es sink代码中所有使用es接口的代码,都调整为使用6.8.5接口

 3. 修改flume-ng-elasticsearch-sink子工程pom.xml,增加transport依赖,用于提供6.8.5客户端依赖

<dependency>
    <groupid>org.elasticsearch.client</groupid>
    <artifactid>transport</artifactid>
</dependency>

 

 4.  修改flume-ng-elasticsearch-sink子工程pom.xml,增加httpclient依赖,用于提供6.8.5客户端依赖

<dependency>
    <groupid>org.apache.httpcomponents</groupid>
    <artifactid>httpclient</artifactid>
</dependency>

 

打包部署

修改完成后需要打包部署,将打出的flume-ng-elasticsearch-sink-1.7.0.jar 包部署到${flume_home}/lib/下

从es环境拷贝所有elastic相关包到${flume_home}/lib/下

从本地拷贝elasticsearch sink依赖的包到${flume_home}/lib/下, 依赖包挺多都是通过报错一一排查出来的:

elasticsearch-6.8.5.jar
elasticsearch-cli-6.8.5.jar
elasticsearch-core-6.8.5.jar
elasticsearch-rest-client-6.8.5.ja
elasticsearch-secure-sm-6.8.5.jar
elasticsearch-ssl-config-6.8.5.jar
elasticsearch-x-content-6.8.5.jar
httpasyncclient-4.1.2.jar
jackson-core-asl-1.9.3.jar.bak
lang-mustache-client-6.8.5.jar
netty-3.9.4.final.jar
netty-buffer-4.1.32.final.jar
netty-codec-4.1.32.final.jar
netty-codec-http-4.1.32.final.jar
netty-common-4.1.32.final.jar
netty-handler-4.1.32.final.jar
netty-resolver-4.1.32.final.jar
netty-transport-4.1.32.final.jar
parent-join-client-6.8.5.jar
percolator-client-6.8.5.jar
rank-eval-client-6.8.5.jar
reindex-client-6.8.5.jar
transport-6.8.5.jar
transport-netty4-client-6.8.5.jar

 

 

定制flume interceptor

趟坑

 下面是遇到的几个缺包报错:

fail_on_symbol_hash_overflow

11 三月 2020 12:16:31,586 error [lifecyclesupervisor-1-2] (org.apache.flume.lifecycle.lifecyclesupervisor$monitorrunnable.run:251) - unable to start sinkrunner: { policy:org.apache.flume.sink.defaultsinkprocessor@29ce66c0 countergroup:{ name:null counters:{} } } - exception follows.
java.lang.nosuchfielderror: fail_on_symbol_hash_overflow
at org.elasticsearch.common.xcontent.json.jsonxcontent.<clinit>(jsonxcontent.java:57)
at org.elasticsearch.common.xcontent.xcontenttype$1.xcontent(xcontenttype.java:56)
at org.elasticsearch.common.settings.setting.arraytoparsablestring(setting.java:1318)
at org.elasticsearch.common.settings.setting.access$800(setting.java:87)
at org.elasticsearch.common.settings.setting$listsetting.lambda$new$0(setting.java:1343)
at org.elasticsearch.common.settings.setting$listsetting.innergetraw(setting.java:1353)
at org.elasticsearch.common.settings.setting.getraw(setting.java:461)
at org.elasticsearch.common.settings.setting.lambda$listsetting$35(setting.java:1269)
at org.elasticsearch.common.settings.setting.listsetting(setting.java:1286)
at org.elasticsearch.common.settings.setting.listsetting(setting.java:1269)
at org.elasticsearch.transport.transportsettings.<clinit>(transportsettings.java:47)
at org.elasticsearch.client.transport.transportclient.newpluginservice(transportclient.java:105)
at org.elasticsearch.client.transport.transportclient.buildtemplate(transportclient.java:135)
at org.elasticsearch.client.transport.transportclient.<init>(transportclient.java:288)
at org.elasticsearch.transport.client.prebuilttransportclient.<init>(prebuilttransportclient.java:128)
at org.elasticsearch.transport.client.prebuilttransportclient.<init>(prebuilttransportclient.java:114)
at org.elasticsearch.transport.client.prebuilttransportclient.<init>(prebuilttransportclient.java:104)
at org.apache.flume.sink.elasticsearch.client.elasticsearchtransportclient.openclient(elasticsearchtransportclient.java:206)
at org.apache.flume.sink.elasticsearch.client.elasticsearchtransportclient.<init>(elasticsearchtransportclient.java:79)
at org.apache.flume.sink.elasticsearch.client.elasticsearchclientfactory.getclient(elasticsearchclientfactory.java:48)
at org.apache.flume.sink.elasticsearch.elasticsearchsink.start(elasticsearchsink.java:354)
at org.apache.flume.sink.defaultsinkprocessor.start(defaultsinkprocessor.java:45)
at org.apache.flume.sinkrunner.start(sinkrunner.java:79)
at org.apache.flume.lifecycle.lifecyclesupervisor$monitorrunnable.run(lifecyclesupervisor.java:249)
at java.util.concurrent.executors$runnableadapter.call(executors.java:511)
at java.util.concurrent.futuretask.runandreset(futuretask.java:308)
at java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.access$301(scheduledthreadpoolexecutor.java:180)
at java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.run(scheduledthreadpoolexecutor.java:294)
at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)
at java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)
at java.lang.thread.run(thread.java:745)
11 三月 2020 12:16:31,590 info [lifecyclesupervisor-1-2] (org.apache.flume.sink.elasticsearch.elasticsearchsink.stop:381) - elasticsearch sink {} stopping

 

问题:所依赖jackson包版本不一致

解决:需要把本地打包使用的所有jackson包都替换到flume环境

 

classnotfound:io.netty.util.nettyruntime

问题:缺少nettycommon包

解决:把本地仓库netty目录下所有依赖包直接拷贝到flume环境

 

 

classnotfound:sslconfigurationloader

问题:缺少elasticsearch-ssl-config包

解决方案:elasticsearch所有包都需要添加到flume

<dependency>
    <groupid>org.elasticsearch</groupid>
    <artifactid>elasticsearch-ssl-config</artifactid>
    <version>6.7.1</version>
</dependency>

 

classnotfound:schemeiosessionstrategy

 

unner: { policy:org.apache.flume.sink.defaultsinkprocessor@6d310488 countergroup:{ name:null counters:{} } } - exception follows.
java.lang.noclassdeffounderror: org/apache/http/nio/conn/schemeiosessionstrategy
at org.elasticsearch.index.reindex.reindexplugin.getsettings(reindexplugin.java:94)
at org.elasticsearch.plugins.pluginsservice.lambda$getpluginsettings$0(pluginsservice.java:89)
at java.util.stream.referencepipeline$7$1.accept(referencepipeline.java:267)
at java.util.arraylist$arraylistspliterator.foreachremaining(arraylist.java:1374)
at java.util.stream.abstractpipeline.copyinto(abstractpipeline.java:481)
at java.util.stream.abstractpipeline.wrapandcopyinto(abstractpipeline.java:471)
at java.util.stream.reduceops$reduceop.evaluatesequential(reduceops.java:708)
at java.util.stream.abstractpipeline.evaluate(abstractpipeline.java:234)
at java.util.stream.referencepipeline.collect(referencepipeline.java:499)
at org.elasticsearch.plugins.pluginsservice.getpluginsettings(pluginsservice.java:89)
at org.elasticsearch.client.transport.transportclient.buildtemplate(transportclient.java:147)
at org.elasticsearch.client.transport.transportclient.<init>(transportclient.java:288)
at org.elasticsearch.transport.client.prebuilttransportclient.<init>(prebuilttransportclient.java:128)
at org.elasticsearch.transport.client.prebuilttransportclient.<init>(prebuilttransportclient.java:114)
at org.elasticsearch.transport.client.prebuilttransportclient.<init>(prebuilttransportclient.java:104)
at org.apache.flume.sink.elasticsearch.client.elasticsearchtransportclient.openclient(elasticsearchtransportclient.java:206)
at org.apache.flume.sink.elasticsearch.client.elasticsearchtransportclient.<init>(elasticsearchtransportclient.java:79)
at org.apache.flume.sink.elasticsearch.client.elasticsearchclientfactory.getclient(elasticsearchclientfactory.java:48)
at org.apache.flume.sink.elasticsearch.elasticsearchsink.start(elasticsearchsink.java:354)
at org.apache.flume.sink.defaultsinkprocessor.start(defaultsinkprocessor.java:45)
at org.apache.flume.sinkrunner.start(sinkrunner.java:79)
at org.apache.flume.lifecycle.lifecyclesupervisor$monitorrunnable.run(lifecyclesupervisor.java:249)
at java.util.concurrent.executors$runnableadapter.call(executors.java:511)
at java.util.concurrent.futuretask.runandreset(futuretask.java:308)
at java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.access$301(scheduledthreadpoolexecutor.java:180)
at java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.run(scheduledthreadpoolexecutor.java:294)
at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)
at java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)
at java.lang.thread.run(thread.java:745)
caused by: java.lang.classnotfoundexception: org.apache.http.nio.conn.schemeiosessionstrategy
at java.net.urlclassloader.findclass(urlclassloader.java:381)
at java.lang.classloader.loadclass(classloader.java:424)
at sun.misc.launcher$appclassloader.loadclass(launcher.java:331)
at java.lang.classloader.loadclass(classloader.java:357)
... 29 more

 

解决方案:httpasyncclient包需要拷贝到flume

 

 两种客户端

flume elasticsearch sink访问es使用了两种客户端:

prebuilttransportclient

transportclient使用接口9300
httpclient

     restclient接口9200