欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

kafka2x-Elasticsearch 数据同步工具demo

程序员文章站 2022-06-07 11:50:26
Bboss is a good elasticsearch Java rest client. It operates and accesses elasticsearch in a way similar to mybatis. BBoss Environmental requirements J ......

bboss is a good elasticsearch java rest client. it operates and accesses elasticsearch in a way similar to mybatis.

bboss environmental requirements

jdk requirement: jdk 1.7+

elasticsearch version requirements: 1.x,2.x,5.x,6.x,+

spring boot: 1.x,2.x,+

kafka2x-elasticsearch 数据同步工具demo

兼容 kafka_2.12-2.3.0 系列版本 ,使用本demo所带的应用程序运行容器环境,可以快速编写,打包发布可运行的数据导入工具

支持的 kafka_2.12-2.3.0 系列版本 到elasticsearch数据同步

kafka低版本(kafka_2.12-0.10.2.0系列版本)同步工具案例地址:

支持的elasticsearch版本: 1.x,2.x,5.x,6.x,7.x,+

支持海量pb级数据同步导入功能

导入maven坐标

<dependency>
  <groupid>com.bbossgroups.plugins</groupid>
  <artifactid>bboss-elasticsearch-rest-kafka2x</artifactid>
  <version>5.9.9</version>
  <scope>compile</scope>
</dependency>

构建部署

准备工作

需要通过gradle构建发布版本,gradle安装配置参考文档:

下载源码工程-基于gradle

从上面的地址下载源码工程,然后导入idea或者eclipse,根据自己的需求,修改导入程序逻辑

org.frameworkset.elasticsearch.imp.kafka2esdemo

如果需要测试和调试导入功能,运行kafka2esdemo的main方法即可即可:

public class dbdemo {
	public static void main(string args[]){
		kafka2esdemo dbdemo = new kafka2esdemo();
		boolean dropindice = true;//commonlauncher.getbooleanattribute("dropindice",false);//同时指定了默认值

		dbdemo.scheduletimestampimportdata(dropindice);
	}
    .....
}

修改es配置-kafka2x-elasticsearch\src\main\resources\application.properties

修改完毕配置后,就可以进行功能调试了。

测试调试通过后,就可以构建发布可运行的版本了:进入命令行模式,在源码工程根目录kafka2x-elasticsearch 下运行以下gradle指令打包发布版本

release.bat

运行作业

gradle构建成功后,在build/distributions目录下会生成可以运行的zip包,解压运行导入程序

linux:

chmod +x restart.sh

./restart.sh

windows: restart.bat

作业jvm配置

修改jvm.options,设置内存大小和其他jvm参数

-xms1g

-xmx1g

作业参数配置

在使用时,为了避免调试过程中不断打包发布数据同步工具,可以将部分控制参数配置到启动配置文件resources/application.properties中,然后在代码中通过以下方法获取配置的参数:

#工具主程序
mainclass=org.frameworkset.elasticsearch.imp.kafka2esdemo

# 参数配置
# 在代码中获取方法:commonlauncher.getbooleanattribute("dropindice",false);//同时指定了默认值false
dropindice=false

在代码中获取参数dropindice方法:

boolean dropindice = commonlauncher.getbooleanattribute("dropindice",false);//同时指定了默认值false

另外可以在resources/application.properties配置控制作业执行的一些参数,例如工作线程数,等待队列数,批处理size等等:

queuesize=50
workthreads=10
batchsize=20

在作业执行方法中获取并使用上述参数:

int batchsize = commonlauncher.getintproperty("batchsize",10);//同时指定了默认值
int queuesize = commonlauncher.getintproperty("queuesize",50);//同时指定了默认值
int workthreads = commonlauncher.getintproperty("workthreads",10);//同时指定了默认值
importbuilder.setbatchsize(batchsize);
importbuilder.setqueue(queuesize);//设置批量导入线程池等待队列长度
importbuilder.setthreadcount(workthreads);//设置批量导入线程池工作线程数量

elasticsearch技术交流群:166471282

elasticsearch微信公众号:bbossgroup

码云项目:kafka2x-elasticsearch

 

磁力搜索网站导航2020更新