使用java操作elasticsearch的具体方法
系统环境: vm12 下的centos 7.2
当前安装版本: elasticsearch-2.4.0.tar.gz
java操作es集群步骤1:配置集群对象信息;2:创建客户端;3:查看集群信息
1:集群名称
默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错。
2:嗅探功能
通过client.transport.sniff启动嗅探功能,这样只需要指定集群中的某一个节点(不一定是主节点),然后会加载集群中的其他节点,这样只要程序不停即使此节点宕机仍然可以连接到其他节点。
3:查询类型searchtype.query_then_fetch
es 查询共有4种查询类型
query_and_fetch:
主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。
这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。
query_then_fetch:
主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。
这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式
def_query_and_fetch: 和 dfs_query_then_fetch:
将各个分片的规则统一起来进行打分。解决了排序问题但是dfs_query_and_fetch仍然存在数据量问题,dfs_query_then_fetch两种噢乖你问题都解决但是效率是最差的。
1, 获取client, 两种方式获取
@before public void before() throws exception { map<string, string> map = new hashmap<string, string>(); map.put("cluster.name", "elasticsearch_wenbronk"); settings.builder settings = settings.builder().put(map); client = transportclient.builder().settings(settings).build() .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname("www.wenbronk.com"), integer.parseint("9300"))); }
@before public void before11() throws exception { // 创建客户端, 使用的默认集群名, "elasticsearch" // client = transportclient.builder().build() // .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname("www.wenbronk.com"), 9300)); // 通过setting对象指定集群配置信息, 配置的集群名 settings settings = settings.settingsbuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名 // .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知 // .put("network.host", "192.168.50.37") .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上 // .put("client.transport.nodes_sampler_interval", 5) //报错, // .put("client.transport.ping_timeout", 5) // 报错, ping等待时间, .build(); client = transportclient.builder().settings(settings).build() .addtransportaddress(new inetsockettransportaddress(new inetsocketaddress("192.168.50.37", 9300))); // 默认5s // 多久打开连接, 默认5s system.out.println("success connect"); }
ps: 官网给的2种方式都不能用, 需要合起来才能用, 浪费老子一下午...
其他参数的意义:
代码:
package com.wenbronk.javaes; import java.net.inetaddress; import java.net.inetsocketaddress; import java.util.date; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.concurrent.timeunit; import org.elasticsearch.action.bulk.backoffpolicy; import org.elasticsearch.action.bulk.bulkprocessor; import org.elasticsearch.action.bulk.bulkprocessor.listener; import org.elasticsearch.action.bulk.bulkrequest; import org.elasticsearch.action.bulk.bulkrequestbuilder; import org.elasticsearch.action.bulk.bulkresponse; import org.elasticsearch.action.delete.deleterequest; import org.elasticsearch.action.delete.deleteresponse; import org.elasticsearch.action.get.getresponse; import org.elasticsearch.action.get.multigetitemresponse; import org.elasticsearch.action.get.multigetresponse; import org.elasticsearch.action.index.indexrequest; import org.elasticsearch.action.index.indexresponse; import org.elasticsearch.action.update.updaterequest; import org.elasticsearch.action.update.updateresponse; import org.elasticsearch.client.transport.transportclient; import org.elasticsearch.cluster.node.discoverynode; import org.elasticsearch.common.settings.settings; import org.elasticsearch.common.transport.inetsockettransportaddress; import org.elasticsearch.common.unit.bytesizeunit; import org.elasticsearch.common.unit.bytesizevalue; import org.elasticsearch.common.unit.timevalue; import org.elasticsearch.common.xcontent.xcontentbuilder; import org.elasticsearch.common.xcontent.xcontentfactory; import org.elasticsearch.script.script; import org.junit.before; import org.junit.test; import com.alibaba.fastjson.jsonobject; /** * 使用java api操作elasticsearch * * @author 231 * */ public class javaestest { private transportclient client; private indexrequest source; /** * 获取连接, 第一种方式 * @throws exception */ // @before public void before() throws exception { map<string, string> map = new hashmap<string, string>(); map.put("cluster.name", "elasticsearch_wenbronk"); settings.builder settings = settings.builder().put(map); client = transportclient.builder().settings(settings).build() .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname("www.wenbronk.com"), integer.parseint("9300"))); } /** * 查看集群信息 */ @test public void testinfo() { list<discoverynode> nodes = client.connectednodes(); for (discoverynode node : nodes) { system.out.println(node.gethostaddress()); } } /** * 组织json串, 方式1,直接拼接 */ public string createjson1() { string json = "{" + "\"user\":\"kimchy\"," + "\"postdate\":\"2013-01-30\"," + "\"message\":\"trying out elasticsearch\"" + "}"; return json; } /** * 使用map创建json */ public map<string, object> createjson2() { map<string,object> json = new hashmap<string, object>(); json.put("user", "kimchy"); json.put("postdate", new date()); json.put("message", "trying out elasticsearch"); return json; } /** * 使用fastjson创建 */ public jsonobject createjson3() { jsonobject json = new jsonobject(); json.put("user", "kimchy"); json.put("postdate", new date()); json.put("message", "trying out elasticsearch"); return json; } /** * 使用es的帮助类 */ public xcontentbuilder createjson4() throws exception { // 创建json对象, 其中一个创建json的方式 xcontentbuilder source = xcontentfactory.jsonbuilder() .startobject() .field("user", "kimchy") .field("postdate", new date()) .field("message", "trying to out elasticsearch") .endobject(); return source; } /** * 存入索引中 * @throws exception */ @test public void test1() throws exception { xcontentbuilder source = createjson4(); // 存json入索引中 indexresponse response = client.prepareindex("twitter", "tweet", "1").setsource(source).get(); // // 结果获取 string index = response.getindex(); string type = response.gettype(); string id = response.getid(); long version = response.getversion(); boolean created = response.iscreated(); system.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created); } /** * get api 获取指定文档信息 */ @test public void testget() { // getresponse response = client.prepareget("twitter", "tweet", "1") // .get(); getresponse response = client.prepareget("twitter", "tweet", "1") .setoperationthreaded(false) // 线程安全 .get(); system.out.println(response.getsourceasstring()); } /** * 测试 delete api */ @test public void testdelete() { deleteresponse response = client.preparedelete("twitter", "tweet", "1") .get(); string index = response.getindex(); string type = response.gettype(); string id = response.getid(); long version = response.getversion(); system.out.println(index + " : " + type + ": " + id + ": " + version); } /** * 测试更新 update api * 使用 updaterequest 对象 * @throws exception */ @test public void testupdate() throws exception { updaterequest updaterequest = new updaterequest(); updaterequest.index("twitter"); updaterequest.type("tweet"); updaterequest.id("1"); updaterequest.doc(xcontentfactory.jsonbuilder() .startobject() // 对没有的字段添加, 对已有的字段替换 .field("gender", "male") .field("message", "hello") .endobject()); updateresponse response = client.update(updaterequest).get(); // 打印 string index = response.getindex(); string type = response.gettype(); string id = response.getid(); long version = response.getversion(); system.out.println(index + " : " + type + ": " + id + ": " + version); } /** * 测试update api, 使用client * @throws exception */ @test public void testupdate2() throws exception { // 使用script对象进行更新 // updateresponse response = client.prepareupdate("twitter", "tweet", "1") // .setscript(new script("hits._source.gender = \"male\"")) // .get(); // 使用xcontfactory.jsonbuilder() 进行更新 // updateresponse response = client.prepareupdate("twitter", "tweet", "1") // .setdoc(xcontentfactory.jsonbuilder() // .startobject() // .field("gender", "malelelele") // .endobject()).get(); // 使用updaterequest对象及script // updaterequest updaterequest = new updaterequest("twitter", "tweet", "1") // .script(new script("ctx._source.gender=\"male\"")); // updateresponse response = client.update(updaterequest).get(); // 使用updaterequest对象及documents进行更新 updateresponse response = client.update(new updaterequest("twitter", "tweet", "1") .doc(xcontentfactory.jsonbuilder() .startobject() .field("gender", "male") .endobject() )).get(); system.out.println(response.getindex()); } /** * 测试update * 使用updaterequest * @throws exception * @throws interruptedexception */ @test public void testupdate3() throws interruptedexception, exception { updaterequest updaterequest = new updaterequest("twitter", "tweet", "1") .script(new script("ctx._source.gender=\"male\"")); updateresponse response = client.update(updaterequest).get(); } /** * 测试upsert方法 * @throws exception * */ @test public void testupsert() throws exception { // 设置查询条件, 查找不到则添加生效 indexrequest indexrequest = new indexrequest("twitter", "tweet", "2") .source(xcontentfactory.jsonbuilder() .startobject() .field("name", "214") .field("gender", "gfrerq") .endobject()); // 设置更新, 查找到更新下面的设置 updaterequest upsert = new updaterequest("twitter", "tweet", "2") .doc(xcontentfactory.jsonbuilder() .startobject() .field("user", "wenbronk") .endobject()) .upsert(indexrequest); client.update(upsert).get(); } /** * 测试multi get api * 从不同的index, type, 和id中获取 */ @test public void testmultiget() { multigetresponse multigetresponse = client.preparemultiget() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("anothoer", "type", "foo") .get(); for (multigetitemresponse itemresponse : multigetresponse) { getresponse response = itemresponse.getresponse(); if (response.isexists()) { string sourceasstring = response.getsourceasstring(); system.out.println(sourceasstring); } } } /** * bulk 批量执行 * 一次查询可以update 或 delete多个document */ @test public void testbulk() throws exception { bulkrequestbuilder bulkrequest = client.preparebulk(); bulkrequest.add(client.prepareindex("twitter", "tweet", "1") .setsource(xcontentfactory.jsonbuilder() .startobject() .field("user", "kimchy") .field("postdate", new date()) .field("message", "trying out elasticsearch") .endobject())); bulkrequest.add(client.prepareindex("twitter", "tweet", "2") .setsource(xcontentfactory.jsonbuilder() .startobject() .field("user", "kimchy") .field("postdate", new date()) .field("message", "another post") .endobject())); bulkresponse response = bulkrequest.get(); system.out.println(response.getheaders()); } /** * 使用bulk processor * @throws exception */ @test public void testbulkprocessor() throws exception { // 创建bulkporcessor对象 bulkprocessor bulkprocessor = bulkprocessor.builder(client, new listener() { public void beforebulk(long paramlong, bulkrequest parambulkrequest) { // todo auto-generated method stub } // 执行出错时执行 public void afterbulk(long paramlong, bulkrequest parambulkrequest, throwable paramthrowable) { // todo auto-generated method stub } public void afterbulk(long paramlong, bulkrequest parambulkrequest, bulkresponse parambulkresponse) { // todo auto-generated method stub } }) // 1w次请求执行一次bulk .setbulkactions(10000) // 1gb的数据刷新一次bulk .setbulksize(new bytesizevalue(1, bytesizeunit.gb)) // 固定5s必须刷新一次 .setflushinterval(timevalue.timevalueseconds(5)) // 并发请求数量, 0不并发, 1并发允许执行 .setconcurrentrequests(1) // 设置退避, 100ms后执行, 最大请求3次 .setbackoffpolicy( backoffpolicy.exponentialbackoff(timevalue.timevaluemillis(100), 3)) .build(); // 添加单次请求 bulkprocessor.add(new indexrequest("twitter", "tweet", "1")); bulkprocessor.add(new deleterequest("twitter", "tweet", "2")); // 关闭 bulkprocessor.awaitclose(10, timeunit.minutes); // 或者 bulkprocessor.close(); } }
tes2代码:
package com.wenbronk.javaes; import java.net.inetsocketaddress; import org.apache.lucene.queryparser.xml.filterbuilderfactory; import org.elasticsearch.action.search.multisearchresponse; import org.elasticsearch.action.search.searchrequestbuilder; import org.elasticsearch.action.search.searchresponse; import org.elasticsearch.action.search.searchtype; import org.elasticsearch.client.transport.transportclient; import org.elasticsearch.common.settings.settings; import org.elasticsearch.common.settings.settings.builder; import org.elasticsearch.common.transport.inetsockettransportaddress; import org.elasticsearch.common.unit.timevalue; import org.elasticsearch.index.query.querybuilder; import org.elasticsearch.index.query.querybuilders; import org.elasticsearch.search.searchhit; import org.elasticsearch.search.aggregations.aggregation; import org.elasticsearch.search.aggregations.aggregationbuilders; import org.elasticsearch.search.aggregations.bucket.histogram.datehistograminterval; import org.elasticsearch.search.sort.sortorder; import org.elasticsearch.search.sort.sortparseelement; import org.junit.before; import org.junit.test; /** * 使用java api操作elasticsearch * search api * @author 231 * */ public class javaestest2 { private transportclient client; /** * 获取client对象 */ @before public void testbefore() { builder builder = settings.settingsbuilder(); builder.put("cluster.name", "wenbronk_escluster"); // .put("client.transport.ignore_cluster_name", true); settings settings = builder.build(); org.elasticsearch.client.transport.transportclient.builder transportbuild = transportclient.builder(); transportclient client1 = transportbuild.settings(settings).build(); client = client1.addtransportaddress((new inetsockettransportaddress(new inetsocketaddress("192.168.50.37", 9300)))); system.out.println("success connect to escluster"); } /** * 测试查询 */ @test public void testsearch() { // searchrequestbuilder searchrequestbuilder = client.preparesearch("twitter", "tweet", "1"); // searchresponse response = searchrequestbuilder.settypes("type1", "type2") // .setsearchtype(searchtype.dfs_query_then_fetch) // .setquery(querybuilders.termquery("user", "test")) // .setpostfilter(querybuilders.rangequery("age").from(0).to(1)) // .setfrom(0).setsize(2).setexplain(true) // .execute().actionget(); searchresponse response = client.preparesearch() .execute().actionget(); // searchhits hits = response.gethits(); // for (searchhit searchhit : hits) { // for(iterator<searchhitfield> iterator = searchhit.iterator(); iterator.hasnext(); ) { // searchhitfield next = iterator.next(); // system.out.println(next.getvalues()); // } // } system.out.println(response); } /** * 测试scroll api * 对大量数据的处理更有效 */ @test public void testscrolls() { querybuilder querybuilder = querybuilders.termquery("twitter", "tweet"); searchresponse response = client.preparesearch("twitter") .addsort(sortparseelement.doc_field_name, sortorder.asc) .setscroll(new timevalue(60000)) .setquery(querybuilder) .setsize(100).execute().actionget(); while(true) { for (searchhit hit : response.gethits().gethits()) { system.out.println("i am coming"); } searchresponse response2 = client.preparesearchscroll(response.getscrollid()) .setscroll(new timevalue(60000)).execute().actionget(); if (response2.gethits().gethits().length == 0) { system.out.println("oh no====="); break; } } } /** * 测试multisearch */ @test public void testmultisearch() { querybuilder qb1 = querybuilders.querystringquery("elasticsearch"); searchrequestbuilder requestbuilder1 = client.preparesearch().setquery(qb1).setsize(1); querybuilder qb2 = querybuilders.matchquery("user", "kimchy"); searchrequestbuilder requestbuilder2 = client.preparesearch().setquery(qb2).setsize(1); multisearchresponse multiresponse = client.preparemultisearch().add(requestbuilder1).add(requestbuilder2) .execute().actionget(); long nbhits = 0; for (multisearchresponse.item item : multiresponse.getresponses()) { searchresponse response = item.getresponse(); nbhits = response.gethits().gettotalhits(); searchhit[] hits = response.gethits().gethits(); system.out.println(nbhits); } } /** * 测试聚合查询 */ @test public void testaggregation() { searchresponse response = client.preparesearch() .setquery(querybuilders.matchallquery()) // 先使用query过滤掉一部分 .addaggregation(aggregationbuilders.terms("term").field("user")) .addaggregation(aggregationbuilders.datehistogram("agg2").field("birth") .interval(datehistograminterval.year)) .execute().actionget(); aggregation aggregation2 = response.getaggregations().get("term"); aggregation aggregation = response.getaggregations().get("agg2"); // searchresponse response2 = client.search(new searchrequest().searchtype(searchtype.query_and_fetch)).actionget(); } /** * 测试terminate */ @test public void testterminateafter() { searchresponse response = client.preparesearch("twitter").setterminateafter(1000).get(); if (response.isterminatedearly()) { system.out.println("ternimate"); } } /** * 过滤查询: 大于gt, 小于lt, 小于等于lte, 大于等于gte */ @test public void testfilter() { searchresponse response = client.preparesearch("twitter") .settypes("") .setquery(querybuilders.matchallquery()) //查询所有 .setsearchtype(searchtype.query_then_fetch) // .setpostfilter(filterbuilders.rangefilter("age").from(0).to(19) // .includelower(true).includeupper(true)) // .setpostfilter(filterbuilderfactory .rangefilter("age").gte(18).lte(22)) .setexplain(true) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面 .get(); } /** * 分组查询 */ @test public void testgroupby() { client.preparesearch("twitter").settypes("tweet") .setquery(querybuilders.matchallquery()) .setsearchtype(searchtype.query_then_fetch) .addaggregation(aggregationbuilders.terms("user") .field("user").size(0) // 根据user进行分组 // size(0) 也是10 ).get(); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
推荐阅读
-
使用java操作elasticsearch的具体方法
-
浅谈java Properties类的使用基础
-
Java实现基于JDBC操作mysql数据库的方法
-
在ASP.NET 2.0中操作数据之三十三:基于DataList和Repeater使用DropDownList过滤的主/从报表
-
Java中Date,Calendar,Timestamp的区别以及相互转换与使用
-
java在linux系统下开机启动无法使用sudo命令的原因及解决办法
-
使用itextpdf操作pdf的实例讲解
-
Java获取当前操作系统的信息实例代码
-
Java Socket使用加密协议进行传输对象的方法
-
Java使用Socket通信传输文件的方法示例