欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

elasticsearch5 官方文档整理。。。。中

程序员文章站 2022-03-18 13:41:19
...

Document API

Index API

1,
IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

2,
String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json)
        .get();
3,返回
// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();

注意:
IndexRequestBuilder prepareIndex = client.prepareIndex(index, type, id);
        prepareIndex.setOpType(OpType.CREATE);
设置OpType为create时,如果数据已经存在就会返回失败。防止重复输入。

Get API

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

Delete API

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

Delete By Query API   根据查询出来的结果删除














BulkByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
        .filter(QueryBuilders.matchQuery("gender", "male")) 
        .source("persons")                                  
        .get();                                             

long deleted = response.getDeleted();                       

elasticsearch5 官方文档整理。。。。中

query

elasticsearch5 官方文档整理。。。。中

index

elasticsearch5 官方文档整理。。。。中

execute the operation

elasticsearch5 官方文档整理。。。。中

number of deleted documents

Update API

1,
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

2,
client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  , ScriptService.ScriptType.INLINE, null, null))
        .get();

 两者选一

 client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()               
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();

elasticsearch5 官方文档整理。。。。中

Your script. It could also be a locally stored script name. In that case, you’ll need to use ScriptService.ScriptType.FILE

elasticsearch5 官方文档整理。。。。中

Document which will be merged to the existing one.

update by script 更新
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

update by maging documents  合并
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

upsert 不存在就插入,存在就更新
IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);              
client.update(updateRequest).get();

Multi Get API

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")           
    .add("twitter", "tweet", "2", "3", "4") 
    .add("another", "type", "foo")          
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {                      
        String json = response.getSourceAsString(); 
    }
}

elasticsearch5 官方文档整理。。。。中

get by a single id

elasticsearch5 官方文档整理。。。。中

or by a list of ids for the same index / type

elasticsearch5 官方文档整理。。。。中

you can also get from another index

elasticsearch5 官方文档整理。。。。中

iterate over the result set

elasticsearch5 官方文档整理。。。。中

you can check if the document exists

elasticsearch5 官方文档整理。。。。中

access to the _source field

Bulk API  可以在一次请求中有删除,查询等操作

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}


Using Bulk Processor

BulkProcessor类提供了一个简单的接口,可以根据请求的数量或大小自动刷新批量操作,或者在给定的时间段之后。


BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
        .build();

elasticsearch5 官方文档整理。。。。中

Add your elasticsearch client

elasticsearch5 官方文档整理。。。。中

This method is called just before bulk is executed. You can for example see the numberOfActions with request.numberOfActions()

elasticsearch5 官方文档整理。。。。中

This method is called after bulk execution. You can for example check if there was some failing requests with response.hasFailures()

elasticsearch5 官方文档整理。。。。中

This method is called when the bulk failed and raised a Throwable

elasticsearch5 官方文档整理。。。。中

We want to execute the bulk every 10 000 requests

elasticsearch5 官方文档整理。。。。中

We want to flush the bulk every 5mb

elasticsearch5 官方文档整理。。。。中

We want to flush the bulk every 5 seconds whatever the number of requests

elasticsearch5 官方文档整理。。。。中

Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.

elasticsearch5 官方文档整理。。。。中

Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three times. A retry is attempted whenever one or more bulk item requests have failed with an EsRejectedExecutionException which indicates that there were too little compute resources available for processing the request. To disable backoff, pass BackoffPolicy.noBackoff().

By default, BulkProcessor:

  • sets bulkActions to 1000
  • sets bulkSize to 5mb
  • does not set flushInterval
  • sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
  • sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds

 Search API

SearchResponse response = client.prepareSearch("index1", "index2")
        .setTypes("type1", "type2")
        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
        .setQuery(QueryBuilders.termQuery("multi", "test"))                 // Query
        .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter
        .setFrom(0).setSize(60).setExplain(true)
        .get();

Using scrolls in Java
滚动不适用于实时用户请求,而是用于处理大量数据,例如,以便将一个索引的内容重新索引到具有不同配置的新索引中。

QueryBuilder qb = termQuery("multi", "test");

SearchResponse scrollResp = client.prepareSearch(test)
        .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
        .setScroll(new TimeValue(60000))
        .setQuery(qb)
        .setSize(100).get(); //max of 100 hits will be returned for each scroll
//Scroll until no hits are returned
do {
    for (SearchHit hit : scrollResp.getHits().getHits()) {
        //Handle the hit...
    }

    scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
} while(scrollResp.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.