欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Elasticsearch实战——JAVA客户端API

程序员文章站 2022-03-05 10:00:26
...

Elasticsearch实战——JAVA客户端API

1. 创建spring-boot项目

1.1 pom.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.0.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.elasticsearch.java.api</groupId>
	<artifactId>elasticsearch-java-api</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>es</name>
	<description>elasticsearch的java api使用</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.elasticsearch</groupId>
					<artifactId>elasticsearch</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.lucene</groupId>
					<artifactId>lucene-core</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>elasticsearch-rest-high-level-client</artifactId>
			<version>7.5.0</version>
		</dependency>
	<dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
	<version>7.5.0</version>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

1.2 启动类配置

//关闭spring-boot的自动装配elasticsearch
@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class})
public class RunApplication {
	
	public static void main(String[] args) throws IOException {
		SpringApplication.run(RunApplication.class, args);
    }
}

1.3 properties配置

#elasticesearch配置
elasticsearch.node[0].host=127.0.0.1
elasticsearch.node[0].port=9200

1.4 ElasticSearchProperties.java配置

@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchProperties {
    private List<ESNode> node;
    private int connectTimeOut = 1000;
    private int socketTimeOut = 30000;
    private int connectionRequestTimeOut = 500;
    private int maxConnectNum = 100;
    private int maxConnectPreRoute = 100;


    public int getConnectTimeOut() {
        return connectTimeOut;
    }

    public void setConnectTimeOut(int connectTimeOut) {
        this.connectTimeOut = connectTimeOut;
    }

    public int getSocketTimeOut() {
        return socketTimeOut;
    }

    public void setSocketTimeOut(int socketTimeOut) {
        this.socketTimeOut = socketTimeOut;
    }

    public int getConnectionRequestTimeOut() {
        return connectionRequestTimeOut;
    }

    public void setConnectionRequestTimeOut(int connectionRequestTimeOut) {
        this.connectionRequestTimeOut = connectionRequestTimeOut;
    }

    public int getMaxConnectNum() {
        return maxConnectNum;
    }

    public void setMaxConnectNum(int maxConnectNum) {
        this.maxConnectNum = maxConnectNum;
    }

    public int getMaxConnectPreRoute() {
        return maxConnectPreRoute;
    }

    public void setMaxConnectPreRoute(int maxConnectPreRoute) {
        this.maxConnectPreRoute = maxConnectPreRoute;
    }

    public List<ESNode> getNode() {
        return node;
    }

    public void setNode(List<ESNode> node) {
        this.node = node;
    }
}
public class ESNode {
    private String host;
    private int port;

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

}

1.5 ElasticSearchConfiguration.java配置

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ElasticSearchConfiguration {
	ElasticSearchProperties properties;
	
	@Autowired
	public ElasticSearchConfiguration(ElasticSearchProperties properties) {
		this.properties = properties;
	}
	
	@Bean("client")
	public RestHighLevelClient client() {
		List<HttpHost> list = new ArrayList<HttpHost>();
		List<ESNode> node = properties.getNode();
		for (ESNode n : node) {
			list.add(new HttpHost(n.getHost(), n.getPort()));
		}
		HttpHost[] array = list.toArray(new HttpHost[list.size()]);
		RestClientBuilder builder = RestClient.builder(array);
		setConnectTimeOutConfig(builder);
		setMutiConnectConfig(builder);
		RestHighLevelClient client = new RestHighLevelClient(builder);
		return client;
	}
	
	
	/**
	 * 主要关于异步httpclient的连接延时配置
	 * @param builder 
	 */
	private void setConnectTimeOutConfig(RestClientBuilder builder) {
		builder.setRequestConfigCallback(new RequestConfigCallback() {

			@Override
			public Builder customizeRequestConfig(Builder requestConfigBuilder) {
				requestConfigBuilder.setConnectTimeout(properties.getConnectTimeOut());
				requestConfigBuilder.setSocketTimeout(properties.getSocketTimeOut());
				requestConfigBuilder.setConnectionRequestTimeout(properties.getConnectionRequestTimeOut());
				return requestConfigBuilder;
			}

		});

	}

	/**
	 * 主要关于异步httpclient的连接数配置
	 * @param builder 
	 */
	private void setMutiConnectConfig(RestClientBuilder builder) {
		builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {

			@Override
			public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
				// 指定最大总连接值
				httpClientBuilder.setMaxConnTotal(properties.getMaxConnectNum());
				// 指定每个路由值的最大连接
				httpClientBuilder.setMaxConnPerRoute(properties.getMaxConnectPreRoute());
				return httpClientBuilder;
			}
		});
	}
}

2. JAVA API操作

public class Source {
    private String id;
    private String source;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getSource() {
        return source;
    }

    public void setSource(String source) {
        this.source = source;
    }
}

2.1 引用的java类

import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeRequest;
import org.elasticsearch.action.admin.indices.shrink.ResizeResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.List;
import java.util.Map;

2.2 索引管理

2.2.1 创建索引

	/**
	 * 创建索引
	 * @param index 索引名称
	 * @param settings
	 * @param mapping
	 * @param aliasName 索引别名
	 * @return
	 */
	public boolean createIndex(String index, String settings, String mapping, String... aliasName){
		assert StringUtils.isNotBlank(index);
		assert StringUtils.isNotBlank(settings);
		assert StringUtils.isNotBlank(mapping);
		CreateIndexRequest request = new CreateIndexRequest(index);
		alias(request, aliasName);
		if (!StringUtils.isEmpty(settings)){
			request.settings(settings,XContentType.JSON);
		}
		if (!StringUtils.isEmpty(mapping)){
			request.mapping(mapping,XContentType.JSON);
		}


		try {
			CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
			return response.isAcknowledged();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return false;
	}

	private void alias(CreateIndexRequest request, String[] aliasName) {
		if (aliasName != null){
			for (String name : aliasName) {
				if (StringUtils.isEmpty(name)){
					continue;
				}
				Alias alias = new Alias(name);
				request.alias(alias);
			}
		}
	}

2.2.2 修改setting

	/**
	 * 修改setting
	 * @param index
	 * @param settings
	 * @return
	 */
	public boolean settings(String index,String settings){
		assert StringUtils.isNotBlank(index);
		assert StringUtils.isNotBlank(settings);
		UpdateSettingsRequest request = new UpdateSettingsRequest(index);
		try {
			request.settings(settings,XContentType.JSON);
			AcknowledgedResponse response = client.indices().putSettings(request, RequestOptions.DEFAULT);
			return response.isAcknowledged();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return false;
	}

2.2.3 修改mapping

	/**
	 * 修改mapping
	 * @param index
	 * @param mapping
	 * @return
	 */
	public boolean mapping(String index, String mapping){
		assert StringUtils.isNotBlank(index);
		assert StringUtils.isNotBlank(mapping);
		PutMappingRequest request = new PutMappingRequest(index);
		try {
			AcknowledgedResponse response = client.indices().putMapping(request, RequestOptions.DEFAULT);
			return response.isAcknowledged();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return false;
	}

2.2.4 别名操作

	/**
	 * 别名操作
	 * @param index 索引名称
	 * @param alias 别名
	 * @param type  操作类型
	 * @return
	 */
	public boolean alias(String index, String alias,IndicesAliasesRequest.AliasActions.Type type){
		IndicesAliasesRequest request = new IndicesAliasesRequest();
		request.origin(index);
		IndicesAliasesRequest.AliasActions actions = new IndicesAliasesRequest.AliasActions(type);
		actions.alias(alias);
		request.addAliasAction(actions);
		try {
			AcknowledgedResponse response = client.indices().updateAliases(request, RequestOptions.DEFAULT);
			return response.isAcknowledged();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return false;
	}

2.2.5 合并索引

	/**
	 * 合并索引
	 * @param maxNumSegment 预计合并后的最大分段数量
	 * @param indices
	 * @return
	 */
	public boolean forceMerge(int maxNumSegment, String... indices){
		ForceMergeRequest request = new ForceMergeRequest();
		if (indices != null){
			request.indices(indices);
		}
		request.maxNumSegments(maxNumSegment);
		ForceMergeListener listener = new ForceMergeListener();
		client.indices().forcemergeAsync(request, RequestOptions.DEFAULT, listener);
		return true;
	}

	class ForceMergeListener implements ActionListener<ForceMergeResponse> {

		@Override
		public void onResponse(ForceMergeResponse response) {
			RestStatus status = response.getStatus();
			if (status == RestStatus.OK){
				System.out.println("合并索引成功");
			}else {
				System.out.println("合并索引失败");
			}
		}

		@Override
		public void onFailure(Exception e) {
			System.out.println("合并索引失败");
			e.printStackTrace();
		}
	}

2.2.6 缩小索引

/**
	 * 缩小索引
	 * @param sourceIndex
	 * @param targetIndex
	 * @param targetSetting
	 * @return
	 */
	public boolean shrink(String sourceIndex, String targetIndex, String targetSetting){
		ResizeRequest request = new ResizeRequest(targetIndex, sourceIndex);
		org.elasticsearch.action.admin.indices.create.CreateIndexRequest createIndexRequest = new org.elasticsearch.action.admin.indices.create.CreateIndexRequest(targetIndex);
		createIndexRequest.settings(targetSetting, XContentType.JSON);
		request.setTargetIndex(createIndexRequest);
		client.indices().shrinkAsync(request,RequestOptions.DEFAULT, null);
		return true;
	}

	class ShrinkListener implements ActionListener<ResizeResponse> {

		@Override
		public void onResponse(ResizeResponse response) {
			if (response.isAcknowledged()){
				System.out.println("缩小索引成功");
			}else {
				System.out.println("缩小索引失败");
			}
		}

		@Override
		public void onFailure(Exception e) {
			System.out.println("缩小索引失败");
			e.printStackTrace();
		}
	}

2.3 文档管理

2.3.1 创建文档

	/**
	 * 创建索引
	 * @throws IOException
	 */
	public void createDoc(String index, Source source) throws IOException {
		try {
            IndexRequest indexRequest = new IndexRequest(index);
            indexRequest.id(source.getId());
            indexRequest.source(source.getSource(), XContentType.JSON);
            //同步请求
            client.index(indexRequest, RequestOptions.DEFAULT);
		} catch (Exception e) {
			e.printStackTrace();
			
		}
	}

2.3.2 批量创建文档

	/**
	 * 批量创建
	 * @param sourceList
	 * @param index
	 */
	public void bulkCreateDoc(String index, List<Source> sourceList) {
		try {
			BulkRequest bulkRequest = new BulkRequest();
			for (Source source : sourceList) {
				IndexRequest request = new IndexRequest(index);
				if (StringUtils.isNotBlank(source.getId())){
					request.id(source.getId());
				}
				if (StringUtils.isBlank(source.getSource())){
					continue;
				}
				request.source(source.getSource(), XContentType.JSON);
				bulkRequest.add(request);
			}
			client.bulk(bulkRequest,RequestOptions.DEFAULT);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

2.3.3 更新文档

	/**
	 * 创建索引
	 * @throws IOException
	 */
	public void updateDoc(String index, Source source) throws IOException {
		try {
            UpdateRequest updateRequest = new UpdateRequest();
            updateRequest.index(index);
			updateRequest.id(source.getId());
            updateRequest
                .doc(source.getSource(), XContentType.JSON)
                //设置重试次数
                .retryOnConflict(3);
            //同步更新
            client.update(updateRequest);
		} catch (Exception e) {
			e.printStackTrace();
			
		}
	}

2.3.4 批量更新文档

	/**
	 * 批量更新
	 * @param index
	 * @param sourceList
	 */
	public void bulkUpdateDoc(String index, List<Source> sourceList) {
		try {
			BulkRequest bulkRequest = new BulkRequest(index);
			for (Source source : sourceList) {
				UpdateRequest request = new UpdateRequest();
				if (StringUtils.isBlank(source.getId()) || StringUtils.isBlank(source.getSource())){
					continue;
				}
				request.id(source.getId());
				request.doc(source.getSource(), XContentType.JSON);
				bulkRequest.add(request);
			}
			client.bulk(bulkRequest,RequestOptions.DEFAULT);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

2.4 搜索API

2.4.1 根据ID获取文档

	/**
	 * 根据ID获取文档
	 * @param index
	 * @param docId
	 * @return
	 * @throws IOException
	 */
	public List<Source> getDocById(String index, String... docId) throws IOException {
		MultiGetRequest request = new MultiGetRequest();
		for (String id : docId) {
			request.add(index,id);
		}
		MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
		MultiGetItemResponse[] responseList = response.getResponses();
		List<Source> result = new ArrayList<>(responseList.length);
		for (MultiGetItemResponse item : responseList) {
			String value = item.getResponse().getSourceAsString();
			Source source = new Source();
			source.setId(item.getId());
			source.setSource(value);
			result.add(source);
		}
		return result;
	}


2.4.2 根据查询条件搜索

	/**
	 * 根据查询条件搜索
	 * @param builder
	 * @param index
	 * @return
	 */	
	public SearchResponse search(SearchSourceBuilder builder, String index) {
		SearchRequest searchRequest = new SearchRequest(index);
        //查询优先级,默认轮询
		//searchRequest.preference(Preference.PREFER_NODES.name());
		//SearchType.DEFAULT;
		//SearchType.DFS_QUERY_THEN_FETCH;提升相关度
		//searchRequest.searchType(SearchType.QUERY_THEN_FETCH.name());
		searchRequest.source(builder);
		/**
		 * 默认对于size>0的request的结果是不会被cache的,即使在index设置中启用了request cache也不行。
		 * 只有在请求的时候,手动加入reqeust cache参数,才可以对size>0的请求进行result cache。
		 * 如果是search,默认是不缓存的,除非你手动打开request_cache=true,在发送请求的时候
		 * 如果是aggr,默认是缓存的,不手动打开request_cache=true,也会缓存聚合的结果
		 */
		//searchRequest.requestCache(true);
		try {
			SearchResponse response = client.search(searchRequest,RequestOptions.DEFAULT);
			
			return response;
		} catch (IOException e) {
			e.printStackTrace();
		}
		return null;
	}

2.4.3 滚动查询

Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10));
int size = 1000;
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.from(0);
builder.size(size);
SearchResponse response = search(builder, scroll, index);
SearchHits hits = response.getHits();
SearchHit[] h = hits.getHits();
while (h.length > 0) {
    for (SearchHit doc : h) {
        String id = doc.getId();
        String s = doc.getSourceAsString();
    }
    if (h.length != size) {
        break;
    }
    String scrollId = response.getScrollId();
    response = client.scroll(scrollId, scroll);
    hits = response.getHits();
    h = hits.getHits();
}
public SearchResponse search(SearchSourceBuilder builder, Scroll scroll) {
    SearchRequest request = new SearchRequest();
    request.indices(index);
    request.scroll(scroll);
    request.source(builder);
    try {
        SearchResponse response = client.search(request,RequestOptions.DEFAULT);
        return response;
    } catch (IOException e) {
        e.printStackTrace();
        return null;
    }
}
public SearchResponse scroll(String scrollId, Scroll scroll){
    try {
        SearchScrollRequest request = new SearchScrollRequest(scrollId);
        request.scroll(scroll);
        SearchResponse response = client.searchScroll(request,RequestOptions.DEFAULT);
        return response;
    } catch (IOException e) {
        e.printStackTrace();
        return null;
    }
}
相关标签: 搜索引擎