欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用java操作elasticsearch的具体方法

程序员文章站 2023-12-20 12:50:58
系统环境: vm12 下的centos 7.2 当前安装版本: elasticsearch-2.4.0.tar.gz java操作es集群步骤1:配置集群对象信...

系统环境: vm12 下的centos 7.2

当前安装版本: elasticsearch-2.4.0.tar.gz

java操作es集群步骤1:配置集群对象信息;2:创建客户端;3:查看集群信息

1:集群名称

默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错。

2:嗅探功能

通过client.transport.sniff启动嗅探功能,这样只需要指定集群中的某一个节点(不一定是主节点),然后会加载集群中的其他节点,这样只要程序不停即使此节点宕机仍然可以连接到其他节点。

3:查询类型searchtype.query_then_fetch

es 查询共有4种查询类型

query_and_fetch:

主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。

这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。

query_then_fetch:

主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。

这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式

def_query_and_fetch: 和 dfs_query_then_fetch:

将各个分片的规则统一起来进行打分。解决了排序问题但是dfs_query_and_fetch仍然存在数据量问题,dfs_query_then_fetch两种噢乖你问题都解决但是效率是最差的。

1, 获取client, 两种方式获取

@before
 public void before() throws exception {
  map<string, string> map = new hashmap<string, string>(); 
  map.put("cluster.name", "elasticsearch_wenbronk"); 
  settings.builder settings = settings.builder().put(map); 
  client = transportclient.builder().settings(settings).build() 
      .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname("www.wenbronk.com"), integer.parseint("9300"))); 
 }
@before
 public void before11() throws exception {
  // 创建客户端, 使用的默认集群名, "elasticsearch"
//  client = transportclient.builder().build()
//    .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname("www.wenbronk.com"), 9300));

  // 通过setting对象指定集群配置信息, 配置的集群名
  settings settings = settings.settingsbuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名
//    .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知
//    .put("network.host", "192.168.50.37")
    .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上
//    .put("client.transport.nodes_sampler_interval", 5) //报错,
//    .put("client.transport.ping_timeout", 5) // 报错, ping等待时间,
    .build();
   client = transportclient.builder().settings(settings).build()
     .addtransportaddress(new inetsockettransportaddress(new inetsocketaddress("192.168.50.37", 9300)));
   // 默认5s
   // 多久打开连接, 默认5s
   system.out.println("success connect");
 }

ps: 官网给的2种方式都不能用, 需要合起来才能用, 浪费老子一下午...

其他参数的意义:

使用java操作elasticsearch的具体方法

代码:

package com.wenbronk.javaes;
import java.net.inetaddress;
import java.net.inetsocketaddress;
import java.util.date;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.concurrent.timeunit;
import org.elasticsearch.action.bulk.backoffpolicy;
import org.elasticsearch.action.bulk.bulkprocessor;
import org.elasticsearch.action.bulk.bulkprocessor.listener;
import org.elasticsearch.action.bulk.bulkrequest;
import org.elasticsearch.action.bulk.bulkrequestbuilder;
import org.elasticsearch.action.bulk.bulkresponse;
import org.elasticsearch.action.delete.deleterequest;
import org.elasticsearch.action.delete.deleteresponse;
import org.elasticsearch.action.get.getresponse;
import org.elasticsearch.action.get.multigetitemresponse;
import org.elasticsearch.action.get.multigetresponse;
import org.elasticsearch.action.index.indexrequest;
import org.elasticsearch.action.index.indexresponse;
import org.elasticsearch.action.update.updaterequest;
import org.elasticsearch.action.update.updateresponse;
import org.elasticsearch.client.transport.transportclient;
import org.elasticsearch.cluster.node.discoverynode;
import org.elasticsearch.common.settings.settings;
import org.elasticsearch.common.transport.inetsockettransportaddress;
import org.elasticsearch.common.unit.bytesizeunit;
import org.elasticsearch.common.unit.bytesizevalue;
import org.elasticsearch.common.unit.timevalue;
import org.elasticsearch.common.xcontent.xcontentbuilder;
import org.elasticsearch.common.xcontent.xcontentfactory;
import org.elasticsearch.script.script;
import org.junit.before;
import org.junit.test;

import com.alibaba.fastjson.jsonobject;

/**
 * 使用java api操作elasticsearch
 * 
 * @author 231
 *
 */
public class javaestest {

 private transportclient client;
 private indexrequest source;
 
 /**
  * 获取连接, 第一种方式
  * @throws exception
  */
// @before
 public void before() throws exception {
  map<string, string> map = new hashmap<string, string>(); 
  map.put("cluster.name", "elasticsearch_wenbronk"); 
  settings.builder settings = settings.builder().put(map); 
  client = transportclient.builder().settings(settings).build() 
      .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname("www.wenbronk.com"), integer.parseint("9300"))); 
 }

/**
  * 查看集群信息
  */
 @test
 public void testinfo() {
  list<discoverynode> nodes = client.connectednodes();
  for (discoverynode node : nodes) {
   system.out.println(node.gethostaddress());
  }
 }
 
 /**
  * 组织json串, 方式1,直接拼接
  */
 public string createjson1() {
  string json = "{" +
    "\"user\":\"kimchy\"," +
    "\"postdate\":\"2013-01-30\"," +
    "\"message\":\"trying out elasticsearch\"" +
   "}";
  return json;
 }
 
 /**
  * 使用map创建json
  */
 public map<string, object> createjson2() {
  map<string,object> json = new hashmap<string, object>();
  json.put("user", "kimchy");
  json.put("postdate", new date());
  json.put("message", "trying out elasticsearch");
  return json;
 }
 
 /**
  * 使用fastjson创建
  */
 public jsonobject createjson3() {
  jsonobject json = new jsonobject();
  json.put("user", "kimchy");
  json.put("postdate", new date());
  json.put("message", "trying out elasticsearch");
  return json;
 }
 
 /**
  * 使用es的帮助类
  */
 public xcontentbuilder createjson4() throws exception {
  // 创建json对象, 其中一个创建json的方式
  xcontentbuilder source = xcontentfactory.jsonbuilder()
   .startobject()
    .field("user", "kimchy")
    .field("postdate", new date())
    .field("message", "trying to out elasticsearch")
   .endobject();
  return source;
 }
 
 /**
  * 存入索引中
  * @throws exception
  */
 @test
 public void test1() throws exception {
  xcontentbuilder source = createjson4();
  // 存json入索引中
  indexresponse response = client.prepareindex("twitter", "tweet", "1").setsource(source).get();
//  // 结果获取
  string index = response.getindex();
  string type = response.gettype();
  string id = response.getid();
  long version = response.getversion();
  boolean created = response.iscreated();
  system.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created);
 }

 /**
  * get api 获取指定文档信息
  */
 @test
 public void testget() {
//  getresponse response = client.prepareget("twitter", "tweet", "1")
//        .get();
  getresponse response = client.prepareget("twitter", "tweet", "1")
    .setoperationthreaded(false) // 线程安全
    .get();
  system.out.println(response.getsourceasstring());
 }
 
 /**
  * 测试 delete api
  */
 @test
 public void testdelete() {
  deleteresponse response = client.preparedelete("twitter", "tweet", "1")
    .get();
  string index = response.getindex();
  string type = response.gettype();
  string id = response.getid();
  long version = response.getversion();
  system.out.println(index + " : " + type + ": " + id + ": " + version);
 }
 
 /**
  * 测试更新 update api
  * 使用 updaterequest 对象
  * @throws exception 
  */
 @test
 public void testupdate() throws exception {
  updaterequest updaterequest = new updaterequest();
  updaterequest.index("twitter");
  updaterequest.type("tweet");
  updaterequest.id("1");
  updaterequest.doc(xcontentfactory.jsonbuilder()
    .startobject()
    // 对没有的字段添加, 对已有的字段替换
     .field("gender", "male")
     .field("message", "hello")
    .endobject());
  updateresponse response = client.update(updaterequest).get();
  
  // 打印
  string index = response.getindex();
  string type = response.gettype();
  string id = response.getid();
  long version = response.getversion();
  system.out.println(index + " : " + type + ": " + id + ": " + version);
 }
 
 /**
  * 测试update api, 使用client
  * @throws exception 
  */
 @test
 public void testupdate2() throws exception {
  // 使用script对象进行更新
//  updateresponse response = client.prepareupdate("twitter", "tweet", "1")
//    .setscript(new script("hits._source.gender = \"male\""))
//    .get();
  
  // 使用xcontfactory.jsonbuilder() 进行更新
//  updateresponse response = client.prepareupdate("twitter", "tweet", "1")
//    .setdoc(xcontentfactory.jsonbuilder()
//      .startobject()
//       .field("gender", "malelelele")
//      .endobject()).get();
  
  // 使用updaterequest对象及script
//  updaterequest updaterequest = new updaterequest("twitter", "tweet", "1")
//    .script(new script("ctx._source.gender=\"male\""));
//  updateresponse response = client.update(updaterequest).get();
  
  // 使用updaterequest对象及documents进行更新
  updateresponse response = client.update(new updaterequest("twitter", "tweet", "1")
    .doc(xcontentfactory.jsonbuilder()
      .startobject()
       .field("gender", "male")
      .endobject()
     )).get();
  system.out.println(response.getindex());
 }
 
 /**
  * 测试update
  * 使用updaterequest
  * @throws exception 
  * @throws interruptedexception 
  */
 @test
 public void testupdate3() throws interruptedexception, exception {
  updaterequest updaterequest = new updaterequest("twitter", "tweet", "1")
   .script(new script("ctx._source.gender=\"male\""));
  updateresponse response = client.update(updaterequest).get();
 }
 
 /**
  * 测试upsert方法
  * @throws exception 
  * 
  */
 @test
 public void testupsert() throws exception {
  // 设置查询条件, 查找不到则添加生效
  indexrequest indexrequest = new indexrequest("twitter", "tweet", "2")
   .source(xcontentfactory.jsonbuilder()
    .startobject()
     .field("name", "214")
     .field("gender", "gfrerq")
    .endobject());
  // 设置更新, 查找到更新下面的设置
  updaterequest upsert = new updaterequest("twitter", "tweet", "2")
   .doc(xcontentfactory.jsonbuilder()
     .startobject()
      .field("user", "wenbronk")
     .endobject())
   .upsert(indexrequest);
  
  client.update(upsert).get();
 }
 
 /**
  * 测试multi get api
  * 从不同的index, type, 和id中获取
  */
 @test
 public void testmultiget() {
  multigetresponse multigetresponse = client.preparemultiget()
  .add("twitter", "tweet", "1")
  .add("twitter", "tweet", "2", "3", "4")
  .add("anothoer", "type", "foo")
  .get();
  
  for (multigetitemresponse itemresponse : multigetresponse) {
   getresponse response = itemresponse.getresponse();
   if (response.isexists()) {
    string sourceasstring = response.getsourceasstring();
    system.out.println(sourceasstring);
   }
  }
 }
 
 /**
  * bulk 批量执行
  * 一次查询可以update 或 delete多个document
  */
 @test
 public void testbulk() throws exception {
  bulkrequestbuilder bulkrequest = client.preparebulk();
  bulkrequest.add(client.prepareindex("twitter", "tweet", "1")
    .setsource(xcontentfactory.jsonbuilder()
      .startobject()
       .field("user", "kimchy")
       .field("postdate", new date())
       .field("message", "trying out elasticsearch")
      .endobject()));
  bulkrequest.add(client.prepareindex("twitter", "tweet", "2")
    .setsource(xcontentfactory.jsonbuilder()
      .startobject()
       .field("user", "kimchy")
       .field("postdate", new date())
       .field("message", "another post")
      .endobject()));
  bulkresponse response = bulkrequest.get();
  system.out.println(response.getheaders());
 }
 
 /**
  * 使用bulk processor
  * @throws exception 
  */
 @test
 public void testbulkprocessor() throws exception {
  // 创建bulkporcessor对象
  bulkprocessor bulkprocessor = bulkprocessor.builder(client, new listener() {
   public void beforebulk(long paramlong, bulkrequest parambulkrequest) {
    // todo auto-generated method stub
   }
   
   // 执行出错时执行
   public void afterbulk(long paramlong, bulkrequest parambulkrequest, throwable paramthrowable) {
    // todo auto-generated method stub
   }
   
   public void afterbulk(long paramlong, bulkrequest parambulkrequest, bulkresponse parambulkresponse) {
    // todo auto-generated method stub
   }
  })
  // 1w次请求执行一次bulk
  .setbulkactions(10000)
  // 1gb的数据刷新一次bulk
  .setbulksize(new bytesizevalue(1, bytesizeunit.gb))
  // 固定5s必须刷新一次
  .setflushinterval(timevalue.timevalueseconds(5))
  // 并发请求数量, 0不并发, 1并发允许执行
  .setconcurrentrequests(1)
  // 设置退避, 100ms后执行, 最大请求3次
  .setbackoffpolicy(
    backoffpolicy.exponentialbackoff(timevalue.timevaluemillis(100), 3))
  .build();
  
  // 添加单次请求
  bulkprocessor.add(new indexrequest("twitter", "tweet", "1"));
  bulkprocessor.add(new deleterequest("twitter", "tweet", "2"));
  
  // 关闭
  bulkprocessor.awaitclose(10, timeunit.minutes);
  // 或者
  bulkprocessor.close();
 }
}

tes2代码:

package com.wenbronk.javaes;
import java.net.inetsocketaddress;
import org.apache.lucene.queryparser.xml.filterbuilderfactory;
import org.elasticsearch.action.search.multisearchresponse;
import org.elasticsearch.action.search.searchrequestbuilder;
import org.elasticsearch.action.search.searchresponse;
import org.elasticsearch.action.search.searchtype;
import org.elasticsearch.client.transport.transportclient;
import org.elasticsearch.common.settings.settings;
import org.elasticsearch.common.settings.settings.builder;
import org.elasticsearch.common.transport.inetsockettransportaddress;
import org.elasticsearch.common.unit.timevalue;
import org.elasticsearch.index.query.querybuilder;
import org.elasticsearch.index.query.querybuilders;
import org.elasticsearch.search.searchhit;
import org.elasticsearch.search.aggregations.aggregation;
import org.elasticsearch.search.aggregations.aggregationbuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.datehistograminterval;
import org.elasticsearch.search.sort.sortorder;
import org.elasticsearch.search.sort.sortparseelement;
import org.junit.before;
import org.junit.test;
/**
 * 使用java api操作elasticsearch
 * search api
 * @author 231
 *
 */
public class javaestest2 {

 private transportclient client;

 /**
  * 获取client对象
  */
 @before
 public void testbefore() {
  builder builder = settings.settingsbuilder();
  builder.put("cluster.name", "wenbronk_escluster");
//    .put("client.transport.ignore_cluster_name", true);
  settings settings = builder.build();
  
  org.elasticsearch.client.transport.transportclient.builder transportbuild = transportclient.builder();
  transportclient client1 = transportbuild.settings(settings).build();
  client = client1.addtransportaddress((new inetsockettransportaddress(new inetsocketaddress("192.168.50.37", 9300))));
  system.out.println("success connect to escluster");
  
 }
 
 /**
  * 测试查询
  */
 @test
 public void testsearch() {
//  searchrequestbuilder searchrequestbuilder = client.preparesearch("twitter", "tweet", "1");
//  searchresponse response = searchrequestbuilder.settypes("type1", "type2")
//       .setsearchtype(searchtype.dfs_query_then_fetch)
//       .setquery(querybuilders.termquery("user", "test"))
//       .setpostfilter(querybuilders.rangequery("age").from(0).to(1))
//       .setfrom(0).setsize(2).setexplain(true)
//       .execute().actionget();
  searchresponse response = client.preparesearch()
    .execute().actionget();
//  searchhits hits = response.gethits();
//  for (searchhit searchhit : hits) {
//   for(iterator<searchhitfield> iterator = searchhit.iterator(); iterator.hasnext(); ) {
//    searchhitfield next = iterator.next();
//    system.out.println(next.getvalues());
//   }
//  }
  system.out.println(response);
 }
 
 /**
  * 测试scroll api
  * 对大量数据的处理更有效
  */
 @test
 public void testscrolls() {
  querybuilder querybuilder = querybuilders.termquery("twitter", "tweet");
  
  searchresponse response = client.preparesearch("twitter")
  .addsort(sortparseelement.doc_field_name, sortorder.asc)
  .setscroll(new timevalue(60000))
  .setquery(querybuilder)
  .setsize(100).execute().actionget();
  
  while(true) {
   for (searchhit hit : response.gethits().gethits()) {
    system.out.println("i am coming");
   }
   searchresponse response2 = client.preparesearchscroll(response.getscrollid())
    .setscroll(new timevalue(60000)).execute().actionget();
   if (response2.gethits().gethits().length == 0) {
    system.out.println("oh no=====");
    break;
   }
  }
  
 }
 
 /**
  * 测试multisearch
  */
 @test
 public void testmultisearch() {
  querybuilder qb1 = querybuilders.querystringquery("elasticsearch");
  searchrequestbuilder requestbuilder1 = client.preparesearch().setquery(qb1).setsize(1);
  
  querybuilder qb2 = querybuilders.matchquery("user", "kimchy");
  searchrequestbuilder requestbuilder2 = client.preparesearch().setquery(qb2).setsize(1);
  
  multisearchresponse multiresponse = client.preparemultisearch().add(requestbuilder1).add(requestbuilder2)
    .execute().actionget();
  long nbhits = 0;
  for (multisearchresponse.item item : multiresponse.getresponses()) {
   searchresponse response = item.getresponse();
   nbhits = response.gethits().gettotalhits();
   searchhit[] hits = response.gethits().gethits();
   system.out.println(nbhits);
  }
  
 }
 
 /**
  * 测试聚合查询
  */
 @test
 public void testaggregation() {
  searchresponse response = client.preparesearch()
    .setquery(querybuilders.matchallquery()) // 先使用query过滤掉一部分
    .addaggregation(aggregationbuilders.terms("term").field("user"))
    .addaggregation(aggregationbuilders.datehistogram("agg2").field("birth")
     .interval(datehistograminterval.year))
    .execute().actionget();
  aggregation aggregation2 = response.getaggregations().get("term");
  aggregation aggregation = response.getaggregations().get("agg2");
//  searchresponse response2 = client.search(new searchrequest().searchtype(searchtype.query_and_fetch)).actionget();
 }
 
 /**
  * 测试terminate
  */
 @test
 public void testterminateafter() {
  searchresponse response = client.preparesearch("twitter").setterminateafter(1000).get();
  if (response.isterminatedearly()) {
   system.out.println("ternimate");
  }
 }
 
 /**
  * 过滤查询: 大于gt, 小于lt, 小于等于lte, 大于等于gte
  */
 @test
 public void testfilter() {
  searchresponse response = client.preparesearch("twitter") 
    .settypes("") 
    .setquery(querybuilders.matchallquery()) //查询所有 
    .setsearchtype(searchtype.query_then_fetch) 
//    .setpostfilter(filterbuilders.rangefilter("age").from(0).to(19) 
//      .includelower(true).includeupper(true)) 
//    .setpostfilter(filterbuilderfactory .rangefilter("age").gte(18).lte(22)) 
    .setexplain(true) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面 
    .get(); 
 }
 
 /**
  * 分组查询
  */
 @test
 public void testgroupby() {
  client.preparesearch("twitter").settypes("tweet")
  .setquery(querybuilders.matchallquery())
  .setsearchtype(searchtype.query_then_fetch)
  .addaggregation(aggregationbuilders.terms("user")
    .field("user").size(0)  // 根据user进行分组
           // size(0) 也是10
  ).get();
 } 
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

上一篇:

下一篇: