Elasticsearch实战——JAVA客户端API
程序员文章站
2022-03-05 10:00:26
...
Elasticsearch实战——JAVA客户端API
1. 创建spring-boot项目
1.1 pom.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.elasticsearch.java.api</groupId>
<artifactId>elasticsearch-java-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>es</name>
<description>elasticsearch的java api使用</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.5.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.5.0</version>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.2 启动类配置
//关闭spring-boot的自动装配elasticsearch
@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class})
public class RunApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(RunApplication.class, args);
}
}
1.3 properties配置
#elasticesearch配置
elasticsearch.node[0].host=127.0.0.1
elasticsearch.node[0].port=9200
1.4 ElasticSearchProperties.java配置
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchProperties {
private List<ESNode> node;
private int connectTimeOut = 1000;
private int socketTimeOut = 30000;
private int connectionRequestTimeOut = 500;
private int maxConnectNum = 100;
private int maxConnectPreRoute = 100;
public int getConnectTimeOut() {
return connectTimeOut;
}
public void setConnectTimeOut(int connectTimeOut) {
this.connectTimeOut = connectTimeOut;
}
public int getSocketTimeOut() {
return socketTimeOut;
}
public void setSocketTimeOut(int socketTimeOut) {
this.socketTimeOut = socketTimeOut;
}
public int getConnectionRequestTimeOut() {
return connectionRequestTimeOut;
}
public void setConnectionRequestTimeOut(int connectionRequestTimeOut) {
this.connectionRequestTimeOut = connectionRequestTimeOut;
}
public int getMaxConnectNum() {
return maxConnectNum;
}
public void setMaxConnectNum(int maxConnectNum) {
this.maxConnectNum = maxConnectNum;
}
public int getMaxConnectPreRoute() {
return maxConnectPreRoute;
}
public void setMaxConnectPreRoute(int maxConnectPreRoute) {
this.maxConnectPreRoute = maxConnectPreRoute;
}
public List<ESNode> getNode() {
return node;
}
public void setNode(List<ESNode> node) {
this.node = node;
}
}
public class ESNode {
private String host;
private int port;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
1.5 ElasticSearchConfiguration.java配置
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticSearchConfiguration {
ElasticSearchProperties properties;
@Autowired
public ElasticSearchConfiguration(ElasticSearchProperties properties) {
this.properties = properties;
}
@Bean("client")
public RestHighLevelClient client() {
List<HttpHost> list = new ArrayList<HttpHost>();
List<ESNode> node = properties.getNode();
for (ESNode n : node) {
list.add(new HttpHost(n.getHost(), n.getPort()));
}
HttpHost[] array = list.toArray(new HttpHost[list.size()]);
RestClientBuilder builder = RestClient.builder(array);
setConnectTimeOutConfig(builder);
setMutiConnectConfig(builder);
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
/**
* 主要关于异步httpclient的连接延时配置
* @param builder
*/
private void setConnectTimeOutConfig(RestClientBuilder builder) {
builder.setRequestConfigCallback(new RequestConfigCallback() {
@Override
public Builder customizeRequestConfig(Builder requestConfigBuilder) {
requestConfigBuilder.setConnectTimeout(properties.getConnectTimeOut());
requestConfigBuilder.setSocketTimeout(properties.getSocketTimeOut());
requestConfigBuilder.setConnectionRequestTimeout(properties.getConnectionRequestTimeOut());
return requestConfigBuilder;
}
});
}
/**
* 主要关于异步httpclient的连接数配置
* @param builder
*/
private void setMutiConnectConfig(RestClientBuilder builder) {
builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// 指定最大总连接值
httpClientBuilder.setMaxConnTotal(properties.getMaxConnectNum());
// 指定每个路由值的最大连接
httpClientBuilder.setMaxConnPerRoute(properties.getMaxConnectPreRoute());
return httpClientBuilder;
}
});
}
}
2. JAVA API操作
public class Source {
private String id;
private String source;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
}
2.1 引用的java类
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.List;
import java.util.Map;
2.2 索引管理
2.2.1 创建索引
/**
* 创建索引
* @param index 索引名称
* @param settings
* @param mapping
* @param aliasName 索引别名
* @return
*/
public boolean createIndex(String index, String settings, String mapping, String... aliasName){
assert StringUtils.isNotBlank(index);
assert StringUtils.isNotBlank(settings);
assert StringUtils.isNotBlank(mapping);
CreateIndexRequest request = new CreateIndexRequest(index);
alias(request, aliasName);
if (!StringUtils.isEmpty(settings)){
request.settings(settings,XContentType.JSON);
}
if (!StringUtils.isEmpty(mapping)){
request.mapping(mapping,XContentType.JSON);
}
try {
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
private void alias(CreateIndexRequest request, String[] aliasName) {
if (aliasName != null){
for (String name : aliasName) {
if (StringUtils.isEmpty(name)){
continue;
}
Alias alias = new Alias(name);
request.alias(alias);
}
}
}
2.2.2 修改setting
/**
* 修改setting
* @param index
* @param settings
* @return
*/
public boolean settings(String index,String settings){
assert StringUtils.isNotBlank(index);
assert StringUtils.isNotBlank(settings);
UpdateSettingsRequest request = new UpdateSettingsRequest(index);
try {
request.settings(settings,XContentType.JSON);
AcknowledgedResponse response = client.indices().putSettings(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
2.2.3 修改mapping
/**
* 修改mapping
* @param index
* @param mapping
* @return
*/
public boolean mapping(String index, String mapping){
assert StringUtils.isNotBlank(index);
assert StringUtils.isNotBlank(mapping);
PutMappingRequest request = new PutMappingRequest(index);
try {
AcknowledgedResponse response = client.indices().putMapping(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
2.2.4 别名操作
/**
* 别名操作
* @param index 索引名称
* @param alias 别名
* @param type 操作类型
* @return
*/
public boolean alias(String index, String alias,IndicesAliasesRequest.AliasActions.Type type){
IndicesAliasesRequest request = new IndicesAliasesRequest();
request.origin(index);
IndicesAliasesRequest.AliasActions actions = new IndicesAliasesRequest.AliasActions(type);
actions.alias(alias);
request.addAliasAction(actions);
try {
AcknowledgedResponse response = client.indices().updateAliases(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
2.2.5 合并索引
/**
* 合并索引
* @param maxNumSegment 预计合并后的最大分段数量
* @param indices
* @return
*/
public boolean forceMerge(int maxNumSegment, String... indices){
ForceMergeRequest request = new ForceMergeRequest();
if (indices != null){
request.indices(indices);
}
request.maxNumSegments(maxNumSegment);
ForceMergeListener listener = new ForceMergeListener();
client.indices().forcemergeAsync(request, RequestOptions.DEFAULT, listener);
return true;
}
class ForceMergeListener implements ActionListener<ForceMergeResponse> {
@Override
public void onResponse(ForceMergeResponse response) {
RestStatus status = response.getStatus();
if (status == RestStatus.OK){
System.out.println("合并索引成功");
}else {
System.out.println("合并索引失败");
}
}
@Override
public void onFailure(Exception e) {
System.out.println("合并索引失败");
e.printStackTrace();
}
}
2.2.6 缩小索引
/**
* 缩小索引
* @param sourceIndex
* @param targetIndex
* @param targetSetting
* @return
*/
public boolean shrink(String sourceIndex, String targetIndex, String targetSetting){
ResizeRequest request = new ResizeRequest(targetIndex, sourceIndex);
org.elasticsearch.action.admin.indices.create.CreateIndexRequest createIndexRequest = new org.elasticsearch.action.admin.indices.create.CreateIndexRequest(targetIndex);
createIndexRequest.settings(targetSetting, XContentType.JSON);
request.setTargetIndex(createIndexRequest);
client.indices().shrinkAsync(request,RequestOptions.DEFAULT, null);
return true;
}
class ShrinkListener implements ActionListener<ResizeResponse> {
@Override
public void onResponse(ResizeResponse response) {
if (response.isAcknowledged()){
System.out.println("缩小索引成功");
}else {
System.out.println("缩小索引失败");
}
}
@Override
public void onFailure(Exception e) {
System.out.println("缩小索引失败");
e.printStackTrace();
}
}
2.3 文档管理
2.3.1 创建文档
/**
* 创建索引
* @throws IOException
*/
public void createDoc(String index, Source source) throws IOException {
try {
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(source.getId());
indexRequest.source(source.getSource(), XContentType.JSON);
//同步请求
client.index(indexRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
}
2.3.2 批量创建文档
/**
* 批量创建
* @param sourceList
* @param index
*/
public void bulkCreateDoc(String index, List<Source> sourceList) {
try {
BulkRequest bulkRequest = new BulkRequest();
for (Source source : sourceList) {
IndexRequest request = new IndexRequest(index);
if (StringUtils.isNotBlank(source.getId())){
request.id(source.getId());
}
if (StringUtils.isBlank(source.getSource())){
continue;
}
request.source(source.getSource(), XContentType.JSON);
bulkRequest.add(request);
}
client.bulk(bulkRequest,RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
}
2.3.3 更新文档
/**
* 创建索引
* @throws IOException
*/
public void updateDoc(String index, Source source) throws IOException {
try {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(index);
updateRequest.id(source.getId());
updateRequest
.doc(source.getSource(), XContentType.JSON)
//设置重试次数
.retryOnConflict(3);
//同步更新
client.update(updateRequest);
} catch (Exception e) {
e.printStackTrace();
}
}
2.3.4 批量更新文档
/**
* 批量更新
* @param index
* @param sourceList
*/
public void bulkUpdateDoc(String index, List<Source> sourceList) {
try {
BulkRequest bulkRequest = new BulkRequest(index);
for (Source source : sourceList) {
UpdateRequest request = new UpdateRequest();
if (StringUtils.isBlank(source.getId()) || StringUtils.isBlank(source.getSource())){
continue;
}
request.id(source.getId());
request.doc(source.getSource(), XContentType.JSON);
bulkRequest.add(request);
}
client.bulk(bulkRequest,RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
}
2.4 搜索API
2.4.1 根据ID获取文档
/**
* 根据ID获取文档
* @param index
* @param docId
* @return
* @throws IOException
*/
public List<Source> getDocById(String index, String... docId) throws IOException {
MultiGetRequest request = new MultiGetRequest();
for (String id : docId) {
request.add(index,id);
}
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse[] responseList = response.getResponses();
List<Source> result = new ArrayList<>(responseList.length);
for (MultiGetItemResponse item : responseList) {
String value = item.getResponse().getSourceAsString();
Source source = new Source();
source.setId(item.getId());
source.setSource(value);
result.add(source);
}
return result;
}
2.4.2 根据查询条件搜索
/**
* 根据查询条件搜索
* @param builder
* @param index
* @return
*/
public SearchResponse search(SearchSourceBuilder builder, String index) {
SearchRequest searchRequest = new SearchRequest(index);
//查询优先级,默认轮询
//searchRequest.preference(Preference.PREFER_NODES.name());
//SearchType.DEFAULT;
//SearchType.DFS_QUERY_THEN_FETCH;提升相关度
//searchRequest.searchType(SearchType.QUERY_THEN_FETCH.name());
searchRequest.source(builder);
/**
* 默认对于size>0的request的结果是不会被cache的,即使在index设置中启用了request cache也不行。
* 只有在请求的时候,手动加入reqeust cache参数,才可以对size>0的请求进行result cache。
* 如果是search,默认是不缓存的,除非你手动打开request_cache=true,在发送请求的时候
* 如果是aggr,默认是缓存的,不手动打开request_cache=true,也会缓存聚合的结果
*/
//searchRequest.requestCache(true);
try {
SearchResponse response = client.search(searchRequest,RequestOptions.DEFAULT);
return response;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
2.4.3 滚动查询
Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10));
int size = 1000;
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.from(0);
builder.size(size);
SearchResponse response = search(builder, scroll, index);
SearchHits hits = response.getHits();
SearchHit[] h = hits.getHits();
while (h.length > 0) {
for (SearchHit doc : h) {
String id = doc.getId();
String s = doc.getSourceAsString();
}
if (h.length != size) {
break;
}
String scrollId = response.getScrollId();
response = client.scroll(scrollId, scroll);
hits = response.getHits();
h = hits.getHits();
}
public SearchResponse search(SearchSourceBuilder builder, Scroll scroll) {
SearchRequest request = new SearchRequest();
request.indices(index);
request.scroll(scroll);
request.source(builder);
try {
SearchResponse response = client.search(request,RequestOptions.DEFAULT);
return response;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
public SearchResponse scroll(String scrollId, Scroll scroll){
try {
SearchScrollRequest request = new SearchScrollRequest(scrollId);
request.scroll(scroll);
SearchResponse response = client.searchScroll(request,RequestOptions.DEFAULT);
return response;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
推荐阅读
-
ElasticSearch实战系列三: ElasticSearch的JAVA API使用教程
-
《ElasticSearch6.x实战教程》之简单搜索、Java客户端(上)
-
Java生鲜电商平台-统一格式返回的API架构设计与实战
-
Elasticsearch Java Rest Client API 整理总结 (一)
-
Elasticsearch Java Rest Client API 整理总结 (二) —— SearchAPI
-
.NET Core IdentityServer4实战 第一章-入门与API添加客户端凭据
-
elasticsearch FunctionScore Java API sample
-
Elasticsearch Java Rest Client API 整理总结 (三)——Building Queries
-
Elasticsearch Java API 的使用(14)—优化索引创建之setting设置、写入优化
-
Java客户端API