欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java连接ElasticSearch集群操作

程序员文章站 2022-07-03 16:13:51
我就废话不多说了,大家还是直接看代码吧~/* *es配置类 * */ @configurationpublic class elasticsearchdatasourceconfigurer {...

我就废话不多说了,大家还是直接看代码吧~

/*
 *es配置类
 *
 */
 
@configuration
public class elasticsearchdatasourceconfigurer {
 
  private static final logger log = logmanager.getlogger(elasticsearchdatasourceconfigurer.class);
  @bean
  public transportclient getesclient() {
    //设置集群名称
    settings settings = settings.builder().put("cluster.name", "bigdata-cluster").put("client.transport.sniff", true).build();
    //创建client
    transportclient client = null;
    try {
      client = new prebuilttransportclient(settings)
          .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname(""), 9300));//集群ip
      log.info("esclient连接建立成功");
    } catch (unknownhostexception e) {
      log.info("esclient连接建立失败");
      e.printstacktrace();
    }
    return client;
  } 
}
/**
 * simple to introduction
 *
 * @description: [添加类]
 */
@repository
public class userdaoimpl implements userdao {
 
	private static final string indexname = "user";//小写
	private static final string typename = "info";
 
	@resource
	transportclient transportclient;
 
	@override
	public int adduser(user[] user) {
		indexresponse indexresponse = null;
		int successnum = 0;
		for (int i = 0; i < user.length; i++) {
			uuid uuid = uuid.randomuuid();
			string str = uuid.tostring();
			string jsonvalue = null;
			try {
				jsonvalue = jsonutil.object2jsonstring(user[i]);
				if (jsonvalue != null) {
					indexresponse = transportclient.prepareindex(indexname, typename, str).setsource(jsonvalue)
							.execute().actionget();
					successnum++;
				}
			} catch (jsonprocessingexception e) {
				e.printstacktrace();
			}
 
		}
		return successnum;
	} 
}
 
/**
 *批量插入
 */
public static void bathadduser(transportclient client, list<user> users) {
 
		bulkrequestbuilder bulkrequest = transportclient.preparebulk();
		for (int i = 0; i < users.size(); i++) {
			uuid uuid = uuid.randomuuid();
			string str = uuid.tostring();
 
			string jsonvalue = null;
			try {
				jsonvalue = jsonutil.object2jsonstring(users.get(i));
			} catch (jsonprocessingexception e) {
				e.printstacktrace();
			}
			bulkrequest.add(client.prepareindex("user", "info", str).setsource(jsonvalue));
			// 一万条插入一次
			if (i % 10000 == 0) {
				bulkrequest.execute().actionget();
			}
			system.out.println("已经插入第" + i + "多少条");
		}
 
	}

补充知识:使用java创建es(elasticsearch)连接池

1.首先要有一个创建连接的工厂类

package com.aly.util; 
import org.apache.commons.pool2.pooledobject;
import org.apache.commons.pool2.pooledobjectfactory;
import org.apache.commons.pool2.impl.defaultpooledobject;
import org.apache.http.httphost;
import org.elasticsearch.client.restclient;
import org.elasticsearch.client.resthighlevelclient;
 
/**
 * eliasticsearch连接池工厂对象
 * @author 00000
 *
 */
public class esclientpoolfactory implements pooledobjectfactory<resthighlevelclient>{
 
	@override
	public void activateobject(pooledobject<resthighlevelclient> arg0) throws exception {
		system.out.println("activateobject");
		
	}
	
	/**
	 * 销毁对象
	 */
	@override
	public void destroyobject(pooledobject<resthighlevelclient> pooledobject) throws exception {
		resthighlevelclient highlevelclient = pooledobject.getobject();
		highlevelclient.close();
	}
	
	/**
	 * 生产对象
	 */
//	@suppresswarnings({ "resource" })
	@override
	public pooledobject<resthighlevelclient> makeobject() throws exception {
//		settings settings = settings.builder().put("cluster.name","elasticsearch").build();
		resthighlevelclient client = null;
		try {
			/*client = new prebuilttransportclient(settings)
          .addtransportaddress(new transportaddress(inetaddress.getbyname("localhost"),9300));*/
			client = new resthighlevelclient(restclient.builder(
					new httphost("192.168.1.121", 9200, "http"), new httphost("192.168.1.122", 9200, "http"),
					new httphost("192.168.1.123", 9200, "http"), new httphost("192.168.1.125", 9200, "http"),
					new httphost("192.168.1.126", 9200, "http"), new httphost("192.168.1.127", 9200, "http")));
 
		} catch (exception e) {
			e.printstacktrace();
		}
		return new defaultpooledobject<resthighlevelclient>(client);
	}
 
	@override
	public void passivateobject(pooledobject<resthighlevelclient> arg0) throws exception {
		system.out.println("passivateobject");
	}
 
	@override
	public boolean validateobject(pooledobject<resthighlevelclient> arg0) {
		return true;
	}	
}

2.然后再写我们的连接池工具类

package com.aly.util; 
import org.apache.commons.pool2.impl.genericobjectpool;
import org.apache.commons.pool2.impl.genericobjectpoolconfig;
import org.elasticsearch.client.resthighlevelclient;
 
/**
 * elasticsearch 连接池工具类
 * 
 * @author 00000
 *
 */
public class elasticsearchpoolutil {
	// 对象池配置类,不写也可以,采用默认配置
	private static genericobjectpoolconfig poolconfig = new genericobjectpoolconfig();
	// 采用默认配置maxtotal是8,池中有8个client
	static {
		poolconfig.setmaxtotal(8);
	}
	// 要池化的对象的工厂类,这个是我们要实现的类
	private static esclientpoolfactory esclientpoolfactory = new esclientpoolfactory();
	// 利用对象工厂类和配置类生成对象池
	private static genericobjectpool<resthighlevelclient> clientpool = new genericobjectpool<>(esclientpoolfactory,
			poolconfig);
 
	/**
	 * 获得对象
	 * 
	 * @return
	 * @throws exception
	 */
	public static resthighlevelclient getclient() throws exception {
		// 从池中取一个对象
		resthighlevelclient client = clientpool.borrowobject();
		return client;
	}
 
	/**
	 * 归还对象
	 * 
	 * @param client
	 */
	public static void returnclient(resthighlevelclient client) {
		// 使用完毕之后,归还对象
		clientpool.returnobject(client);
	}
}

以上这篇java连接elasticsearch集群操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。