Elasticsearch在Java中的基本使用
添加微信获取更多资源
文章目录
Elasticsearch
Elasticsearch是一个分布式,Restful风格的搜索和数据分析引擎,能够执行及合并多种类型的搜索(结构化数据,非结构化数据,地理位置,指标)。
Elacticsearch安装
Mac下安装Elasticsearch
brew update
#安装时间较长,耐心等待
brew install elasticsearch
安装完成后在输入brew info elasticsearch查看安装信息:
elasticsearch: stable 6.6.1, HEAD
Distributed search & analytics engine
https://www.elastic.co/products/elasticsearch
/usr/local/Cellar/elasticsearch/6.6.1 (120 files, 36.8MB) *
Built from source on 2019-08-22 at 23:59:23
From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/elasticsearch.rb
==> Requirements
Required: java = 1.8 ✔
==> Options
--HEAD
Install HEAD version
==> Caveats
Data: /usr/local/var/lib/elasticsearch/elasticsearch_Jeremy/
Logs: /usr/local/var/log/elasticsearch/elasticsearch_Jeremy.log
Plugins: /usr/local/var/elasticsearch/plugins/
Config: /usr/local/etc/elasticsearch/
To have launchd start elasticsearch now and restart at login:
brew services start elasticsearch
Or, if you don't want/need a background service you can just run:
elasticsearch
==> Analytics
install: 9,177 (30 days), 27,254 (90 days), 126,222 (365 days)
install_on_request: 8,712 (30 days), 25,931 (90 days), 116,858 (365 days)
build_error: 0 (30 days)
在终端输入elasticsearch等待启动完成之后再浏览器输入http://localhost:9200会在浏览器显示一段json数据
{
"name" : "6UfAlAT",
"cluster_name" : "elasticsearch_Jeremy",
"cluster_uuid" : "pipIOPSJTkGkaoCstAfO-w",
"version" : {
"number" : "6.6.1",
"build_flavor" : "oss",
"build_type" : "tar",
"build_hash" : "1fd8f69",
"build_date" : "2019-02-13T17:10:04.160291Z",
"build_snapshot" : false,
"lucene_version" : "7.6.0",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}
Linux下安装Elasticsearch
- 在合适的位置创建文件夹elasticsearch
- 下载Elasticsearch
#下载
curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.4.2.tar.gz
#解压
tar -zxvf elasticsearch-5.4.2.tar.gz
#进入解压后的文件夹
cd elasticsearch-5.4.2/bin/
#启动
./elasticsearch
执行启动命令后报错如下:
penJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x000000008a660000, 1973026816, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1973026816 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /data/software/elasticsearch/elasticsearch-5.4.2/bin/hs_err_pid17095.log
[aaa@qq.com bin]# curl -get http://localhost:9200
curl: (7) Failed connect to localhost:9200; Connection refused
这个错误是因为内存分配不够造成的,安装的机子只有2G的内存,不够,需要修改配置文件
vim ../config/jvm.options
#将文件中
-Xms2g
-Xmx2g
#修改为
-Xms256m
-Xmx256m
再次启动后报出如下错误
org.elasticsearch.bootstrap.StartupException: java.lang.RuntimeException: can not run elasticsearch as root
造成这个错误的原因是因为不允许使用root用户启动,新建一个用户并赋予权限。
useradd es
passwd es
#elasticsearch-5.4.2文件夹赋予es权限
chown -R es:es /data/software/elasticsearch/elasticsearch-5.4.2
#切换用户
su es
#再次启动
./elasticsearch
显示如下表示启动成功:
[2019-08-24T01:01:48,013][INFO ][o.e.n.Node ] [] initializing ...
[2019-08-24T01:01:48,336][INFO ][o.e.e.NodeEnvironment ] [Nxmtc8q] using [1] data paths, mounts [[/ (rootfs)]], net usable_space [18.9gb], net total_space [39.2gb], spins? [unknown], types [rootfs]
[2019-08-24T01:01:48,336][INFO ][o.e.e.NodeEnvironment ] [Nxmtc8q] heap size [247.5mb], compressed ordinary object pointers [true]
[2019-08-24T01:01:48,338][INFO ][o.e.n.Node ] node name [Nxmtc8q] derived from node ID [Nxmtc8qFSPmaCUAiIK6YMQ]; set [node.name] to override
[2019-08-24T01:01:48,339][INFO ][o.e.n.Node ] version[5.4.2], pid[18738], build[929b078/2017-06-15T02:29:28.122Z], OS[Linux/3.10.0-957.1.3.el7.x86_64/amd64], JVM[Oracle Corporation/OpenJDK 64-Bit Server VM/1.8.0_212/25.212-b04]
[2019-08-24T01:01:48,339][INFO ][o.e.n.Node ] JVM arguments [-Xms256m, -Xmx256m, -XX:+UseConcMarkSweepGC, -XX:CMSInitiatingOccupancyFraction=75, -XX:+UseCMSInitiatingOccupancyOnly, -XX:+DisableExplicitGC, -XX:+AlwaysPreTouch, -Xss1m, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djna.nosys=true, -Djdk.io.permissionsUseCanonicalPath=true, -Dio.netty.noUnsafe=true, -Dio.netty.noKeySetOptimization=true, -Dio.netty.recycler.maxCapacityPerThread=0, -Dlog4j.shutdownHookEnabled=false, -Dlog4j2.disable.jmx=true, -Dlog4j.skipJansi=true, -XX:+HeapDumpOnOutOfMemoryError, -Des.path.home=/data/software/elasticsearch/elasticsearch-5.4.2]
[2019-08-24T01:01:51,241][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [aggs-matrix-stats]
[2019-08-24T01:01:51,241][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [ingest-common]
[2019-08-24T01:01:51,242][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [lang-expression]
[2019-08-24T01:01:51,242][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [lang-groovy]
[2019-08-24T01:01:51,242][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [lang-mustache]
[2019-08-24T01:01:51,242][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [lang-painless]
[2019-08-24T01:01:51,242][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [percolator]
[2019-08-24T01:01:51,242][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [reindex]
[2019-08-24T01:01:51,242][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [transport-netty3]
[2019-08-24T01:01:51,242][INFO ][o.e.p.PluginsService ] [Nxmtc8q] loaded module [transport-netty4]
[2019-08-24T01:01:51,243][INFO ][o.e.p.PluginsService ] [Nxmtc8q] no plugins loaded
[2019-08-24T01:01:55,849][INFO ][o.e.d.DiscoveryModule ] [Nxmtc8q] using discovery type [zen]
[2019-08-24T01:01:57,672][INFO ][o.e.n.Node ] initialized
[2019-08-24T01:01:57,673][INFO ][o.e.n.Node ] [Nxmtc8q] starting ...
[2019-08-24T01:01:58,253][INFO ][o.e.t.TransportService ] [Nxmtc8q] publish_address {127.0.0.1:9300}, bound_addresses {[::1]:9300}, {127.0.0.1:9300}
[2019-08-24T01:01:58,267][WARN ][o.e.b.BootstrapChecks ] [Nxmtc8q] max file descriptors [65535] for elasticsearch process is too low, increase to at least [65536]
[2019-08-24T01:01:58,267][WARN ][o.e.b.BootstrapChecks ] [Nxmtc8q] max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
[2019-08-24T01:02:01,560][INFO ][o.e.c.s.ClusterService ] [Nxmtc8q] new_master {Nxmtc8q}{Nxmtc8qFSPmaCUAiIK6YMQ}{cbvBBXKtShGFSh9CMVwgFA}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-elected-as-master ([0] nodes joined)
[2019-08-24T01:02:01,746][INFO ][o.e.g.GatewayService ] [Nxmtc8q] recovered [0] indices into cluster_state
[2019-08-24T01:02:01,762][INFO ][o.e.h.n.Netty4HttpServerTransport] [Nxmtc8q] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200}
[2019-08-24T01:02:01,777][INFO ][o.e.n.Node ] [Nxmtc8q] started
打开另外一个命令行窗口执行如下命令返回一个json字符串说明Elasticsearch可以访问了。
curl -X GET http://localhost:9200
#结果如下
{
"name" : "Nxmtc8q",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "zsbTlZlET7auwC2HvteTuw",
"version" : {
"number" : "5.4.2",
"build_hash" : "929b078",
"build_date" : "2017-06-15T02:29:28.122Z",
"build_snapshot" : false,
"lucene_version" : "6.5.1"
},
"tagline" : "You Know, for Search"
}
elasticsearch后台启动
直接启动elasticsearch后退出当前命令行窗口或者执行ctrl+c后程序就会关闭,后台启动执行如下命令即可:
./elasticsearch -d
#执行ps -ef|grep elasticsearch就能查看到elasticsearch的PID
执行的时候会出现没有权限./elasticsearch: Permission denied
需要授权执行命令:chmod +x bin/elasticsearch
再次执行./elasticsearch -d即可启动
SpringBoot+Elasticsearch-快速入门
简介
来源百度百科
Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析和探索的能力。充分利用ElasticSearch的水平伸缩性,能使数据在生产环境变得更有价值。ElasticSearch 的实现原理主要分为以下几个步骤,首先用户将数据提交到Elastic Search 数据库中,再通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据,当用户搜索数据时候,再根据权重将结果排名,打分,再将返回结果呈现给用户。
Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。Elasticsearch是分布式的,这意味着索引可以被分成分片,每个分片可以有0个或多个副本。每个节点托管一个或多个分片,并充当协调器将操作委托给正确的分片。再平衡和路由是自动完成的。相关数据通常存储在同一个索引中,该索引由一个或多个主分片和零个或多个复制分片组成。一旦创建了索引,就不能更改主分片的数量。
Elasticsearch使用Lucene,并试图通过JSON和Java API提供其所有特性。它支持facetting和percolating,如果新文档与注册查询匹配,这对于通知非常有用。另一个特性称为“网关”,处理索引的长期持久性;例如,在服务器崩溃的情况下,可以从网关恢复索引。Elasticsearch支持实时GET请求,适合作为NoSQL数据存储,但缺少分布式事务。
节点Node,集群Cluster和分片Shards
Elasticsearch是分布式数据库,允许多台服务器协同工作,每台服务器可以运行多个实例。单个实例称为一个节点(Node),一组节点构成一个集群(Cluster)。分片是底层的工作单元,文档保存在分片内,分片又被分配到集群内的各个节点里,每个分片仅保存全部数据的一部分。
索引Index,类型Type和文档Document
可以类比我们比较熟悉的Mysql数据库:
索引:Index —> db
类型:Type —> table
文档:Document —> row
文档字段:Field —> column
如果我们要访问一个文档元数据应该包括index/type/id这三种类型,比较好理解。
使用Restful API与Elasticsearch进行交互
所有其他语言可以使用Restful API通过9200与Elasticsearch进行通讯,一个Elasticsearch请求和任何HTTP请求一样有若干个相同的部件组成:
curl -X ‘????/:/
?<QUERY_STRING>’ -d ‘’
被"<>"标记的部件:
部件名作用:
- VERB:适当的HTTP方法(GET/POST/PUT/HEAD/DELETE)
- PROTOCOL:http或者https
- HOST:运行Elasticsearch的端口号,默认9200
- PATH:API的终端路径(例如——count将返回集群中文档的数量),可能包含多个组件,例如:——cluster/stats和——nodes/stats/jvm。
- QUERY_STRING:任意可选的查询字符串参数
- BODY:一个JSON格式的请求(可选)
比如计算集群中文档的数量,可以使用如下:
curl -X GET http://localhost:9200/_count?pretty
#返回json
{
"count" : 0,
"_shards" : {
"total" : 0,
"successful" : 0,
"skipped" : 0,
"failed" : 0
}
}
Springboot Elasticsearch整合实践(RestHighLevelClient)
RestHighLevelClient基于RestClient的一个封装,主要目的是暴露具体的API方法,以参数的形式接受请求对象并返回一个相应对象。每个API可以异步或同步调用,同步方法返回一个相应对象,异步方法一asynx结尾,需要一个监听器参数通知响应或者接受异常。
接下来搭建一个简单的SpringBoot-Elasticsearch项目:
- 创建一个SpringBoot项目,并添加相关依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.55</version>
</dependency>
- 配置application.properties
spring.application.name=es
server.port=8888
- 创建es配置类,用来构件RestHighLevelClient对象
在main文件夹下创建configuration文件夹 ,再创建EsConfiguration.java文件
/**
* create by wupengchoy
* 2019-08-25
*/
@Configuration
public class EsConfiguration {
//集群地址,多个使用','隔开
private static String hosts = "127.0.0.1";
//端口号
private static int port = 9200;
//使用的协议
private static String schema = "http";
private static ArrayList<HttpHost> hostList = null;
//连接超时时间
private static int connectTimeOut = 1000;
//socket连接超时时间
private static int socketTimeOut = 30000;
//获取连接超时时间
private static int connectionRequestTimeOut = 500;
//最大连接数
private static int maxConnectNum = 100;
//最大路由连接数
private static int maxConnectPerRoute = 100;
static {
hostList = new ArrayList<>();
String[] hostStrs = hosts.split(",");
for (String host : hostStrs) {
hostList.add(new HttpHost(host, port, schema));
}
}
@Bean
public RestHighLevelClient clien() {
RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0]));
//异步httpclient连接延时配置
builder.setRequestConfigCallback((requestConfigBuilder) -> {
requestConfigBuilder.setConnectTimeout(connectTimeOut);
requestConfigBuilder.setSocketTimeout(socketTimeOut);
requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
return requestConfigBuilder;
});
//异步httpClient连接数配置
builder.setHttpClientConfigCallback((httpClientBuidler) -> {
httpClientBuidler.setMaxConnTotal(maxConnectNum);
httpClientBuidler.setMaxConnPerRoute(maxConnectPerRoute);
return httpClientBuidler;
});
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
}
- 添加测试类EsApplicationTest
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {EsApplication.class})
public class EsApplicationTests {
@BeforeClass
public static void before() {
}
@Test
public void testEs() throws IOException {
}
}
- 添加测试Bean类User
/**
* create by wupengchoy
* 2019-08-25
*/
@Data
public class User {
private int id;
private String userName;
private String password;
@Override
public String toString() {
return String.format("user{id:%s,userName:%s,password:%s}", this.id, this.userName, this.password);
}
}
- 对EsApplicationTest.java进行修改,准备操作前的数据
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {EsApplication.class})
public class EsApplicationTests {
@Autowired
RestHighLevelClient client;
//索引-->db
private static String INDEX_DB;
//类型-->table
private static String TYPE_TABLE;
private static User user;
private static List<User> users;
@BeforeClass
public static void before() {
INDEX_DB = "es_db";
TYPE_TABLE = "user_table";
users = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
user = new User();
user.setId(i);
user.setUserName("uTing".concat(String.valueOf(i)));
user.setPassword("123456");
users.add(user);
}
}
@Test
public void testEs() throws IOException {
}
}
- 创建索引Index—>db
在对elasticsearch进行操作时可以先判断是否存在索引,如果不存在则需要创建
创建索引
public void createIndex(String index) throws IOException {
CreateIndexRequest indexRequest = new CreateIndexRequest(index);
CreateIndexResponse indexResponse = client.indices().create(indexRequest);
System.out.println("create ESIndex:".concat(JSON.toJSONString(indexResponse)));
}
判断索引是否存在
public boolean indexExists(String index) throws IOException {
GetIndexRequest getIndexRequest = new GetIndexRequest();
getIndexRequest.indices(index);
boolean isExist = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
System.out.println(String.format("is esIndex %s exists:%s", index, isExist));
return isExist;
}
测试方法
@Test
public void testEs() throws IOException {
//判断是否存在索引,不存在则新建
if (!indexExists(INDEX_DB)) {
createIndex(INDEX_DB);
}
}
第一次执行,控制台输出索引不存在并创建一个
//省略部分启动信息
...Started EsApplicationTests in 2.612 seconds (JVM running for 4.588)
//当前索引不存在
is esIndex es_db exists:false
//创建一个新的索引
create ESIndex:{"acknowledged":true,"fragment":false,"shardsAcked":true,"shardsAcknowledged":true}
再执行一次,控制台输出显示索引已存在,且没有创建索引的输出
...Started EsApplicationTests in 2.158 seconds (JVM running for 5.619)
//判断出当前索引已存在,不创建
is esIndex es_db exists:true
- 添加类型Type—>table和记录Document—>row
类似于创建表格,储存数据的时候需要先创建一个类型用来储存文档。
/**
* 添加记录
* @param index
* @param type
* @param user
* @throws IOException
*/
public void add(String index, String type, User user) throws IOException {
IndexRequest indexRequest = new IndexRequest(index, type, String.valueOf(user.getId()));
indexRequest.source(JSON.toJSONString(user), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
System.out.println(String.format("add type success:%s", JSON.toJSONString(indexResponse)));
}
与创建索引类似,创建记录之前也需要判断是否存在。
/**
* 判断记录是否已经存在
* @param index
* @param type
* @param user
* @return
* @throws IOException
*/
public boolean typeExists(String index, String type, User user) throws IOException {
GetRequest getRequest = new GetRequest(index, type, String.valueOf(user.getId()));
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
System.out.println(String.format("is user %s exists:%s", user.getUserName(), exists));
return exists;
}
修改testEs方法,添加创建索引的逻辑。
@Test
public void testEs() throws IOException {
//判断是否存在索引,不存在则新建
if (!indexExists(INDEX_DB)) {
createIndex(INDEX_DB);
}
//判断是否存在类型
if (!typeExists(INDEX_DB, TYPE_TABLE, user)) {
add(INDEX_DB, TYPE_TABLE, user);
}
}
执行两次测试方法,两次控制台打印信息分别如下:
#第一次
is esIndex es_db exists:true//之前创建过索引,不再创建
is user uTing9 exists:false//当前记录不存在,需要新建
add type success:{"fragment":false,"id":"9","index":"es_db","primaryTerm":1,"result":"CREATED","seqNo":2,"shardId":{"fragment":true,"id":-1,"index":{"fragment":false,"name":"es_db","uUID":"_na_"},"indexName":"es_db"},"shardInfo":{"failed":0,"failures":[],"fragment":false,"successful":1,"total":2},"type":"user_table","version":3}
#第二次
is esIndex es_db exists:true
is user uTing9 exists:true//判断记录存在,不再新建
接下来还有查询,更新,删除,批量操作等API,具体代码如下:
/**
* 获取记录
* @param index
* @param type
* @param id
* @throws IOException
*/
public void get(String index, String type, int id) throws IOException {
GetRequest getRequest = new GetRequest(index, type, String.valueOf(id));
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
System.out.println(String.format("get result :%s", JSON.toJSONString(getResponse)));
}
/**
* 更新记录信息
* @param index
* @param type
* @param user
* @throws IOException
*/
public void update(String index, String type, User user) throws IOException {
user.setUserName(user.getUserName().concat("update"));
UpdateRequest updateRequest = new UpdateRequest(index, type, String.valueOf(user.getId()));
updateRequest.doc(JSON.toJSONString(user), XContentType.JSON);
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
System.out.println(String.format("ES update:%s", JSON.toJSONString(updateResponse)));
}
/**
* 删除记录
* @param index
* @param type
* @param id
* @throws IOException
*/
public void delete(String index, String type, int id) throws IOException {
DeleteRequest deleteRequest = new DeleteRequest(index, type, String.valueOf(id));
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
System.out.println(String.format("ES delete:%s", JSON.toJSONString(deleteResponse)));
}
/**
* 搜索记录
* @param index
* @param type
* @param key
* @throws IOException
*/
public void search(String index, String type, String key) throws IOException {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
//可以根据字段进行搜索,must表示符合条件的,mustnot表示不符合条件的
queryBuilder.must(QueryBuilders.matchQuery("userName", key));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(queryBuilder);
sourceBuilder.from(0);
//获取记录数,默认为10
sourceBuilder.size(100);
//第一个参数是获取字段,第二个字段是过滤的字段,默认获取全部字段
sourceBuilder.fetchSource(new String[]{"id", "userName", "passWord"}, new String[]{});
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.types(type);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(String.format("ES search:%s", JSON.toJSONString(searchResponse)));
SearchHits hits = searchResponse.getHits();
SearchHit[] searchHits = hits.getHits();
for (SearchHit searchHit : searchHits) {
System.out.println(String.format("search -> %s", searchHit.getSourceAsString()));
}
}
/**
* 批量操作
* @throws IOException
*/
public void bulk() throws IOException {
//批量增加
BulkRequest bulkAddRequest = new BulkRequest();
IndexRequest indexRequest;
for (User user : users) {
indexRequest = new IndexRequest(INDEX_DB, TYPE_TABLE, String.valueOf(user.getId()));
indexRequest.source(JSON.toJSONString(user), XContentType.JSON);
bulkAddRequest.add(indexRequest);
}
BulkResponse bulkAddResponse = client.bulk(bulkAddRequest, RequestOptions.DEFAULT);
System.out.println(String.format("ES bulk add:%s", JSON.toJSONString(bulkAddResponse)));
search(INDEX_DB, TYPE_TABLE, "uTing");
//批量更新
BulkRequest bulkUpdateRequest = new BulkRequest();
UpdateRequest updateRequest;
for (User user : users) {
user.setUserName(user.getUserName().concat(" updated"));
updateRequest = new UpdateRequest(INDEX_DB, TYPE_TABLE, String.valueOf(user.getId()));
updateRequest.doc(JSON.toJSONString(user), XContentType.JSON);
bulkUpdateRequest.add(updateRequest);
}
BulkResponse bulkUpdateResponse = client.bulk(bulkUpdateRequest, RequestOptions.DEFAULT);
System.out.println(String.format("ES bulk update:%s", JSON.toJSONString(bulkUpdateResponse)));
search(INDEX_DB, TYPE_TABLE, "uTing updated");
//批量删除
BulkRequest bulkDeleteRequest = new BulkRequest();
DeleteRequest deleteRequest;
for (User user : users) {
deleteRequest = new DeleteRequest(INDEX_DB, TYPE_TABLE, String.valueOf(user.getId()));
bulkDeleteRequest.add(deleteRequest);
}
BulkResponse bulkDeleteResponse = client.bulk(bulkDeleteRequest, RequestOptions.DEFAULT);
System.out.println(String.format("ES bulk delete:%s", JSON.toJSONString(bulkDeleteResponse)));
search(INDEX_DB, TYPE_TABLE, "this");
}
更新testEs方法中的代码:
@Test
public void testEs() throws IOException {
//判断是否存在索引,不存在则新建
if (!indexExists(INDEX_DB)) {
createIndex(INDEX_DB);
}
//判断是否存在记录
if (!typeExists(INDEX_DB, TYPE_TABLE, user)) {
add(INDEX_DB, TYPE_TABLE, user);
}
//获取
get(INDEX_DB, TYPE_TABLE, user.getId());
//更新
update(INDEX_DB, TYPE_TABLE, user);
//删除
delete(INDEX_DB, TYPE_TABLE, user.getId());
//搜索
search(INDEX_DB, TYPE_TABLE, "uTing");
//批量操作
bulk();
}
执行后的结果打印信息如下:
其他API可以参考博客:Elasticsearch Java Rest Client API 整理总结
上一篇: 自研、高可用:国产通信框架的未来
下一篇: 字符指针和字符数组、多维指针和多维数组