ElasticSearch相关配置及使用(详细说明)
程序员文章站
2022-07-09 19:15:10
...
ElasticSearch相关配置及使用(详细说明)
1、相关配置(仅供参考)
@Configuration
public class ElasticSearchConfig {
@Autowired
private Environment env;
@Bean
public RestHighLevelClient restHighLevelClient() {
// 拆分ip地址
List<HttpHost> hostLists = new ArrayList<>();
String[] hostList = address.split(",");
for (String addr : hostList) {
String host = addr.split(":")[0];
String port = addr.split(":")[1];
//参数1:IP,参数2:端口号,参数三:协议
hostLists.add(new HttpHost(host, Integer.parseInt(port), schema));
}
// 转换成 HttpHost 数组
HttpHost[] httpHost = hostLists.toArray(new HttpHost[]{});
// 构建连接对象
RestClientBuilder builder = RestClient.builder(httpHost);
// 异步连接延时配置
//配置请求超时超时,分为 连接超时(默认1s) 和 套接字超时(默认30s)
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(connectTimeout);//配置连接超时时间
requestConfigBuilder.setSocketTimeout(socketTimeout);//配置套接字超时时间
requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);//获取连接的超时时间
return requestConfigBuilder;
});
// 异步连接数配置
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(maxConnectNum);//配置最大连接数量
httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);//配置最大的路由连接数
return httpClientBuilder;
});
RestHighLevelClient client = new RestHighLevelClient(builder);
//得到索引名数组
String[] indexNames = getIndexNames();
List<String> no_created_indexs = indexExists(client, indexNames);//得到索引没有创建的集合
if(no_created_indexs.size()!=0) {
//创建index
for(String indexName:no_created_indexs) {
createIndex(new RestHighLevelClient(builder),indexName);//创建该索引
}
}
return new RestHighLevelClient(builder);
}
/**
* 创建index,因为CreateIndexRequest属于ddl操作,需要在传入时传一个new的client,在执行完会close client
* @param client
* @param indexName
* @return
*/
private boolean createIndex(RestHighLevelClient client,String indexName) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder()
.put("index.number_of_shards",3) //分片数 //TODO 最好能配置化
.put("index.number_of_replicas", 1));//副本数 //TODO 最好能配置化
request.alias(new Alias(indexName+"alias"));//设置别名
// request.setTimeout(TimeValue.timeValueMinutes(2));//设置创建索引超时2分钟
// 同步请求
try {
CreateIndexResponse createIndexResponse = client.indices().create(request,RequestOptions.DEFAULT);
// 处理响应
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
System.out.println(acknowledged+","+shardsAcknowledged);
client.close();
return true;
} catch (IOException e) {
return false;
}
}
private String[] getIndexNames() {
String[] ins = env.getProperty("elasticsearchinfo.index_list").split(",");
return ins;
}
private Logger log = LoggerFactory.getLogger(ElasticSearchConfig.class);
/**
* 判断索引是否存在
* @param client
* @param indexNames
* @return 未创建的索引list
*/
public List<String> indexExists(RestHighLevelClient client,String[] indexNames) {
GetIndexRequest request = new GetIndexRequest();
List<String> no_created_indexs = new ArrayList<String>();
for(String indexName:indexNames) {
request.indices(indexName);//设置索引
try {
if(client.indices().exists(request, RequestOptions.DEFAULT)) {
}else {//如果不存在,添加到list中
no_created_indexs.add(indexName);
}
} catch(ConnectException e) {
log.error("es服务连接失败或未启动");
} catch (IOException e) {
e.printStackTrace();
no_created_indexs.add(indexName);
}
}
return no_created_indexs;
}
}
2、CURD
说明:在进行CURD之前上面的配置必不可少
(1)增加
public ResponseData doInsertPomenzhi(PoMenzhi po) throws IOException {
//构建新增请求体
if (po.getId() == null || "".equals(po.getId())) {
return ResponseDataUtil.buildError("es索引id缺失");
}
//1、添加新文档需要调用IndexRequest请求
IndexRequest indexRequest = new IndexRequest(index_shanghai, type_shanghai, po.getId().toString());
//source方法;将文档源设置为索引
indexRequest.source(JSON.toJSONString(po), XContentType.JSON);
//发送请求
//同步执行 当以下列方式执行IndexRequest时,客户端在继续执行代码之前,会等待返回索引响应:
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
/**indexResponse 示例:
* {
* "_shards" : {
* "total" : 2,
* "failed" : 0,
* "successful" : 1
* },
* "_index" : "twitter",
* "_type" : "_doc",
* "_id" : "1",
* "_version" : 1,
* "_seq_no" : 0,
* "_primary_term" : 1,
* "result" : "created"
* }
*/
//获取结果,进行比较
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
return ResponseDataUtil.buildSuccess("创建成功");
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
return ResponseDataUtil.buildSuccess("更新成功");
}
//获取分片信息
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
return ResponseDataUtil.buildSuccess("集群部分创建成功");
}
//创建失败
if (shardInfo.getFailed() > 0) {
ArrayList<Object> reason = new ArrayList<>();
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
reason.add(failure.reason());
}
return ResponseDataUtil.buildError("500", "创建失败", reason);
}
return ResponseDataUtil.buildError("500", "创建失败", "原因未知");
}
(2)更新
public ResponseData doUpdate(PoMenzhi po) throws IOException {
if (po.getId() == null || "".equals(po.getId())) {
return ResponseDataUtil.buildError("es索引id缺失");
}
//构建更新请求体
UpdateRequest request = new UpdateRequest(index_shanghai, type_shanghai, po.getId().toString());
request.doc(JSON.toJSONString(po), XContentType.JSON);
//发送请求
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
//处理请求结果
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
return ResponseDataUtil.buildSuccess("创建成功");
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
return ResponseDataUtil.buildSuccess("更新成功");
} else {
return ResponseDataUtil.buildError("更新失败");
}
}
(3)删除
public ResponseData doDelete(Long id) throws IOException {
DeleteRequest request = new DeleteRequest(index_shanghai, type_shanghai, id.toString());
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
//删除响应体
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
return ResponseDataUtil.buildSuccess("删除成功");
}
if (shardInfo.getFailed() > 0) {
ArrayList<Object> reason = new ArrayList<>();
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
reason.add(failure.reason());
}
return ResponseDataUtil.buildError("500", "删除失败", reason);
} else {
return ResponseDataUtil.buildError("500", "删除失败", "原因未知");
}
}
(4)查询
- 创建搜索请求对象SearchRequest,设置查询的指定索引和类型
- 创建搜索内容对象SearchSourceBuilder
- 创建查询对象MatchQueryBuilder,以及MatchQueryBuilder对象的配置
- 将查询对象MatchQueryBuilder添加到搜索内容对象SearchSourceBuilder中,以及SearchSourceBuilder对象的配置
- 将搜索内容对象SearchSourceBuilder添加到搜索请求对象SearchRequest中
public ResponseData getByHitwords(String hitword) throws IOException {
//1、创建搜索请求对象SearchRequest,设置查询的指定索引和类型
SearchRequest searchRequest = new SearchRequest(index_shanghai);
searchRequest.types(type_shanghai);
//2、创建搜索内容对象SearchSourceBuilder
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//3、创建查询对象MatchQueryBuilder,以及MatchQueryBuilder对象的配置
MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("fulltext", hitword);
//启动模糊查询
matchQueryBuilder.fuzziness(Fuzziness.AUTO);
//设置最大扩展选项以控制查询的模糊过程
matchQueryBuilder.maxExpansions(10);
//4、将查询对象MatchQueryBuilder添加到搜索内容对象SearchSourceBuilder中,以及SearchSourceBuilder对象的配置
searchSourceBuilder.query(matchQueryBuilder);
//设置查询的起始索引位置
searchSourceBuilder.from(0);
//设置查询的数量
searchSourceBuilder.size(5);
//设置超时时间
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
//5、将搜索内容对象SearchSourceBuilder添加到搜索请求对象SearchRequest中
searchRequest.source(searchSourceBuilder);
//发送搜索请求
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
SearchHit[] searchHits = hits.getHits();
//解析响应结果
ArrayList list = new ArrayList<>();
for (SearchHit hit : searchHits) {
Map<String, Object> map = hit.getSourceAsMap();
list.add(JSONObject.parseObject(JSON.toJSONString(map)).toJavaObject(PoMenzhiShanghai.class));
}
return ResponseDataUtil.buildSuccess(list);
}