java连接ElasticSearch集群操作
程序员文章站
2022-07-03 16:13:51
我就废话不多说了,大家还是直接看代码吧~/* *es配置类 * */ @configurationpublic class elasticsearchdatasourceconfigurer {...
我就废话不多说了,大家还是直接看代码吧~
/* *es配置类 * */ @configuration public class elasticsearchdatasourceconfigurer { private static final logger log = logmanager.getlogger(elasticsearchdatasourceconfigurer.class); @bean public transportclient getesclient() { //设置集群名称 settings settings = settings.builder().put("cluster.name", "bigdata-cluster").put("client.transport.sniff", true).build(); //创建client transportclient client = null; try { client = new prebuilttransportclient(settings) .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname(""), 9300));//集群ip log.info("esclient连接建立成功"); } catch (unknownhostexception e) { log.info("esclient连接建立失败"); e.printstacktrace(); } return client; } }
/** * simple to introduction * * @description: [添加类] */ @repository public class userdaoimpl implements userdao { private static final string indexname = "user";//小写 private static final string typename = "info"; @resource transportclient transportclient; @override public int adduser(user[] user) { indexresponse indexresponse = null; int successnum = 0; for (int i = 0; i < user.length; i++) { uuid uuid = uuid.randomuuid(); string str = uuid.tostring(); string jsonvalue = null; try { jsonvalue = jsonutil.object2jsonstring(user[i]); if (jsonvalue != null) { indexresponse = transportclient.prepareindex(indexname, typename, str).setsource(jsonvalue) .execute().actionget(); successnum++; } } catch (jsonprocessingexception e) { e.printstacktrace(); } } return successnum; } }
/** *批量插入 */ public static void bathadduser(transportclient client, list<user> users) { bulkrequestbuilder bulkrequest = transportclient.preparebulk(); for (int i = 0; i < users.size(); i++) { uuid uuid = uuid.randomuuid(); string str = uuid.tostring(); string jsonvalue = null; try { jsonvalue = jsonutil.object2jsonstring(users.get(i)); } catch (jsonprocessingexception e) { e.printstacktrace(); } bulkrequest.add(client.prepareindex("user", "info", str).setsource(jsonvalue)); // 一万条插入一次 if (i % 10000 == 0) { bulkrequest.execute().actionget(); } system.out.println("已经插入第" + i + "多少条"); } }
补充知识:使用java创建es(elasticsearch)连接池
1.首先要有一个创建连接的工厂类
package com.aly.util; import org.apache.commons.pool2.pooledobject; import org.apache.commons.pool2.pooledobjectfactory; import org.apache.commons.pool2.impl.defaultpooledobject; import org.apache.http.httphost; import org.elasticsearch.client.restclient; import org.elasticsearch.client.resthighlevelclient; /** * eliasticsearch连接池工厂对象 * @author 00000 * */ public class esclientpoolfactory implements pooledobjectfactory<resthighlevelclient>{ @override public void activateobject(pooledobject<resthighlevelclient> arg0) throws exception { system.out.println("activateobject"); } /** * 销毁对象 */ @override public void destroyobject(pooledobject<resthighlevelclient> pooledobject) throws exception { resthighlevelclient highlevelclient = pooledobject.getobject(); highlevelclient.close(); } /** * 生产对象 */ // @suppresswarnings({ "resource" }) @override public pooledobject<resthighlevelclient> makeobject() throws exception { // settings settings = settings.builder().put("cluster.name","elasticsearch").build(); resthighlevelclient client = null; try { /*client = new prebuilttransportclient(settings) .addtransportaddress(new transportaddress(inetaddress.getbyname("localhost"),9300));*/ client = new resthighlevelclient(restclient.builder( new httphost("192.168.1.121", 9200, "http"), new httphost("192.168.1.122", 9200, "http"), new httphost("192.168.1.123", 9200, "http"), new httphost("192.168.1.125", 9200, "http"), new httphost("192.168.1.126", 9200, "http"), new httphost("192.168.1.127", 9200, "http"))); } catch (exception e) { e.printstacktrace(); } return new defaultpooledobject<resthighlevelclient>(client); } @override public void passivateobject(pooledobject<resthighlevelclient> arg0) throws exception { system.out.println("passivateobject"); } @override public boolean validateobject(pooledobject<resthighlevelclient> arg0) { return true; } }
2.然后再写我们的连接池工具类
package com.aly.util; import org.apache.commons.pool2.impl.genericobjectpool; import org.apache.commons.pool2.impl.genericobjectpoolconfig; import org.elasticsearch.client.resthighlevelclient; /** * elasticsearch 连接池工具类 * * @author 00000 * */ public class elasticsearchpoolutil { // 对象池配置类,不写也可以,采用默认配置 private static genericobjectpoolconfig poolconfig = new genericobjectpoolconfig(); // 采用默认配置maxtotal是8,池中有8个client static { poolconfig.setmaxtotal(8); } // 要池化的对象的工厂类,这个是我们要实现的类 private static esclientpoolfactory esclientpoolfactory = new esclientpoolfactory(); // 利用对象工厂类和配置类生成对象池 private static genericobjectpool<resthighlevelclient> clientpool = new genericobjectpool<>(esclientpoolfactory, poolconfig); /** * 获得对象 * * @return * @throws exception */ public static resthighlevelclient getclient() throws exception { // 从池中取一个对象 resthighlevelclient client = clientpool.borrowobject(); return client; } /** * 归还对象 * * @param client */ public static void returnclient(resthighlevelclient client) { // 使用完毕之后,归还对象 clientpool.returnobject(client); } }
以上这篇java连接elasticsearch集群操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
推荐阅读
-
Java对MySQL数据库进行连接、查询和修改操作方法
-
Java中使用elasticsearch搜索引擎实现简单、修改等操作
-
【⭐】Java—Spring-—数据库操作—使用内置连接池,报读取不到驱动错误。Could not load JDBC driver class。
-
JAVA连接、操作数据库的DBHelper
-
java连接操作mysql
-
java连接数据库使用queryrunner对数据库进行操作
-
【ElasticSearch】java操作ElasticSearch
-
使用Java客户端操作elasticsearch
-
java连接ElasticSearch集群操作
-
elasticsearch的java代码操作详解