elasticsearch6.7 05. Document APIs(10)Reindex API
9、reindex api
reindex要求为源索引中的所有文档启用_source。
reindex
不会配置目标索引,不会复制源索引的设置。你需要在reindex之前先指定mapping
,分片数量,副本数量等选项。
_reindex
最常用的一种方式是复制一个索引。下例会将twitter索引中的文档复制到new_twitter索引中:
post _reindex { "source": { "index": "twitter" }, "dest": { "index": "new_twitter" } }
返回结果:
{ "took" : 147, "timed_out": false, "created": 120, "updated": 0, "deleted": 0, "batches": 1, "version_conflicts": 0, "noops": 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1.0, "throttled_until_millis": 0, "total": 120, "failures" : [ ] }
和 _update_by_query
api 一样,_reindex从原索引获取快照,但它的目标索引必须是不同的索引,所以不太可能发生版本冲突。dest元素可以向 index api 一样配置,控制乐观锁。例如,省略version_type(和上述一样)或者将其设置为internal,会导致elasticsearch草率地将文档转储到目标索引,从而恰巧覆盖具有相同type和id的文档:
post _reindex { "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "version_type": "internal" } }
设置version_type
为external
会使elasticsearch保存源文档的version,创建任何不存在目标索引的文档,并更新任何源文档version比目标文档version要大的文档:
post _reindex { "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "version_type": "external" } }
设置op_type
为create
会导致 _reindex
仅在目标索引中创建缺少的文档。索引已存在的文档都会导致版本冲突:
post _reindex { "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "op_type": "create" } }
默认的版本冲突会导致_reindex
进程终止,你可以在请求体中设置"conflicts": "proceed"
来指示_reindex
继续处理版本冲突之后的文档。请务必注意,其他错误类型的处理不受“conflict”
参数的影响。当请求体中设置"conflicts": "proceed"
时,_reindex
进程将继续发生版本冲突并返回遇到的版本冲突数:
post _reindex { "conflicts": "proceed", "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "op_type": "create" } }
你可以通过在source
字段内添加type
字段或query
字段来限制文档。如下例仅会将索引twitter
中的kimchy
复制到索引new_twitter
中:
post _reindex { "source": { "index": "twitter", "type": "_doc", "query": { "term": { "user": "kimchy" } } }, "dest": { "index": "new_twitter" } }
可以在source
字段中同时列出type
和index
,允许在一个请求中复制大量数据。下例会复制来自twitter
索引和blog
索引的_doc
类型和post
类型的文档:
post _reindex { "source": { "index": ["twitter", "blog"], "type": ["_doc", "post"] }, "dest": { "index": "all_together", "type": "_doc" } }
reindex api 不会处理id冲突,以最后写入的文档为准,但reindex的顺序通常是不可预测,因此依赖此行为并不是一个好主意。应该使用脚本确保id是唯一的。
也可以限制复制文档的数量。下例仅仅从twitter
索引复制一个文档到new_twitter
索引:
post _reindex { "size": 1, "source": { "index": "twitter" }, "dest": { "index": "new_twitter" } }
你可以指定设置sort参数指定排序规则。sort会降低scroll的效率,但是有时候必须使用。如果可以的话,最好限制size的大小。下例会从twitter
索引中复制10000个文档到new_twitter
索引:
post _reindex { "size": 10000, "source": { "index": "twitter", "sort": { "date": "desc" } }, "dest": { "index": "new_twitter" } }
可以使用_source参数过滤字段,例如,可以使用source
从twitter
索引中过滤一部分字段,如下所示:
post _reindex { "source": { "index": "twitter", "_source": ["user", "_doc"] }, "dest": { "index": "new_twitter" } }
_redix
和_update_by_query
一样也支持脚本,它可以修改一个文档,但和_update_by_query
有区别,这个script
仅允许修改文档的metadata
:
post _reindex { "source": { "index": "twitter" }, "dest": { "index": "new_twitter", "version_type": "external" }, "script": { "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", "lang": "painless" } }
和_update_by_query
一样,你可以设置ctx.op
改变在目标索引执行的操作:
- noop
- 设置 ctx.op = "noop",如果你的script确定文档不需要索引到目标索引。这个空操作将会反映到响应体的noop计数器。
- delete
- 设置ctx.op = "delete",如果你的script确定必须要从目标索引中删除文档。删除操作会反映到响应体的deleted计数器。
设置ctx.op为其他值会导致错误。
你甚至可以改变如下的元数据,但一定要小心地操作:
- _id
- _type
- _index
- _version
- _routing
- _parent
设置_version =null
或者从ctx
中清空它,等价于没有在index 请求中发送version参数。不管目标索引文档的版本或者你在_reindex请求中指定的版本类型是什么,它都会导致目标索引的文档被覆盖。
默认情况下,如果_reindex查询到一个指定了routing参数的文档,在新文档中也会保留该参数,除非你在script中改变它。你可以在dest
字段设置routing从而改变这个设置:
-
keep
- 文档的路由值在新索引中保持不变。这是默认值。
-
discard
- 原索引中文档的routing值在新索引中变为null
-
=<some text>
- 文档的路由值在新索引中变为指定值。
例如,你可以使用以下请求从源索引中复制所有公司名是cat的文档到目标索引,并且指定它们的routing值都是cat:
post _reindex { "source": { "index": "source", "query": { "match": { "company": "cat" } } }, "dest": { "index": "dest", "routing": "=cat" } }
默认,_reindex每次scroll查询的文档数是1000。你可以在source元素中使用size参数改变这个大小:
post _reindex { "source": { "index": "source", "size": 100 }, "dest": { "index": "dest", "routing": "=cat" } }
你也可以指定pipeline参数使用ingest node节点特性:
post _reindex { "source": { "index": "source" }, "dest": { "index": "dest", "pipeline": "some_ingest_pipeline" } }
9.1 从远程重建索引(reindex from remote)
reindex支持从远程的集群中获取源索引数据:
post _reindex { "source": { "remote": { "host": "http://otherhost:9200", "username": "user", "password": "pass" }, "index": "source", "query": { "match": { "test": "data" } } }, "dest": { "index": "dest" } }
host
参数必须包含协议,主机和端口(例如:)。username
和 password
是可选的,当elasticsearch节点需要basic auth时会使用它们。使用基本身份验证时务必使用https,否则密码将以纯文本格式发送。有一系列可用于配置https
连接的行为。
远程主机必须要在elasticsearch.yaml
使用reindex.remote.whitelist
属性显式指定允许连接的白名单。它可以是一个逗号分隔的列表((e.g. otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*
)。白名单忽略协议,仅仅使用端口和主机。例如:
reindex.remote.whitelist: "otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*"
必须将白名单配置在所有进行reindex的节点上。
这个特性可以和任何版本的远程集群交互。允许你从任何版本的elasticsearch升级到当前版本。
要将查询发送到旧版的elasticsearch,请将查询参数直接发送到远程主机。
从远程群集执行reindex 不支持手动或自动切片。
远程服务器使用一个最大值是100mb的缓冲区。如果远程索引包含非常大的文档,你需要使用一个更小的批量大小(每一批的数量不要超过缓冲区大小),下面的示例将批量大小设置为10:
post _reindex { "source": { "remote": { "host": "http://otherhost:9200" }, "index": "source", "size": 10, "query": { "match": { "test": "data" } } }, "dest": { "index": "dest" } }
你也可以使用socket_timeout
字段指定请求的套接字进行读取操作的超时时间,connect_timeout
字段指定连接超时时间。默认都是30s。下例将套接字读取超时设置为一分钟,将连接超时设置为10秒:
post _reindex { "source": { "remote": { "host": "http://otherhost:9200", "socket_timeout": "1m", "connect_timeout": "10s" }, "index": "source", "query": { "match": { "test": "data" } } }, "dest": { "index": "dest" } }
9.2 url参数(url parameters)
参考上一节
9.3 配置ssl参数(configuring ssl parameters)
略
9.4 响应体(response body)
json响应如下:
{ "took": 639, "timed_out": false, "total": 5, "updated": 0, "created": 5, "deleted": 0, "batches": 1, "noops": 0, "version_conflicts": 2, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": 1, "throttled_until_millis": 0, "failures": [ ] }
- took
- 整个操作耗费的毫秒数
- timed_out
- 如果在执行 reindex 操作时出现超时,那么这个标识将会返回 true
- total
- 成功执行操作的文档的数量
- updated
- 成功的更新了多少个文档
- ceated
- 成功的创建了多少个文档
- deleted
- 成功的删除了多少个文档
- batches
- 回滚数
- verison_conflicts
- 操作过程中出现版本冲突的数量
- noops
- 由于 ctx.op=noop 设置造成的忽略的文档数
- retries
- 重复尝试的次数,bulk 是批量更新操作重复尝试的次数,search 是查询的重复尝试次数
- throthled_millis
- requests_per_second 参数引起的请求等待时间
- requests_per_second
- 在操作过程中,每秒执行的请求数
- throttled_until_millis
- 执行
reindex
时这个值始终0,只在在调用task api时该值才有意义,它表示下一次(自纪元以来)为了符合requests_per_second
将再次执行请求的毫秒数。
- 执行
- failures
- 执行失败的数组,包含在执行过程中任何不可恢复的错误。如果这个数组不是空的,那么请求会因为这些失败而中止。reindex 是使用批处理实现的,任何失败都会导致整个执行被中止。可以使用conflicts参数来防止reindex在版本冲突时造成操作中止。
9.5 结合 taskapi 使用(works with the task api)
您可以使用 task api 获取任何正在进行 update_by_query 请求的状态:
get _tasks?detailed=true&actions=*reindex
返回值:
{ "nodes" : { "r1a2worbtwkz516z6nes5a" : { "name" : "r1a2wor", "transport_address" : "127.0.0.1:9300", "host" : "127.0.0.1", "ip" : "127.0.0.1:9300", "attributes" : { "testattr" : "test", "portsfile" : "true" }, "tasks" : { "r1a2worbtwkz516z6nes5a:36619" : { "node" : "r1a2worbtwkz516z6nes5a", "id" : 36619, "type" : "transport", "action" : "indices:data/write/reindex", "status" : { "total" : 6154, "updated" : 3500, "created" : 0, "deleted" : 0, "batches" : 4, "version_conflicts" : 0, "noops" : 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1, "throttled_until_millis": 0 }, "description" : "", "start_time_in_millis": 1535149899665, "running_time_in_nanos": 5926916792, "cancellable": true, "headers": {} } } } } }
status:这个对象包含了当前任务的实际状态。total
字段是本次操作需要重新索引的文档数。你可以通过 updated
, created
, and deleted
字段估计处理进度。当以上几个字段的和等于 total
字段时,请求就执行完毕了。
你可以使用 task id 查看某个任务。下例查看task id为 r1a2worbtwkz516z6nes5a:36619
的任务信息:
get /_tasks/r1a2worbtwkz516z6nes5a:36619
该 api 可以与wait_for_comletion=false
集成使用,可以清晰的查看已完成任务的状态。如果任务已经完成,并且在其上设置了wait_for_completion=false
,那么请求将会返回结果或是错误字段。此功能的代价是当wait_for_completion=false
时会在.tasks/task/${taskid}
目录下会创建文档。您可以根据需要删除该文档。
9.6 取消任务(works with the cancel task api)
任何_update_by_query
操作都可以通过task cancel
api来取消,如:
post _tasks/r1a2worbtwkz516z6nes5a:36619/_cancel
取消应该执行很快,但可能需要几秒钟。在此期间上面的 task status api将继续列出该任务,直到它完全被取消了。
9.7 阈值(rethrottling)
在正在执行的请求中,requests_per_second
的值可以在运行时通过_rethrotted
api进行修改:
post _update_by_query/r1a2worbtwkz516z6nes5a:36619/_rethrottle?requests_per_second=-1
可以使用tasks api找到任务id。
和 requests_per_seconds 参数设置一样,rethrottling 参数可以是 - 1 (禁用限制)或是其他十进制数(如 1.7 或 12 )。rethrottling 参数能提高查询速度且会立即生效,但是降低速度必须等到当前操作执行完后才起作用。这可以防止滚动超时
9.7.1 重建索引以更改字段的名称
_reindex可用于构建具有重命名字段的索引副本。假设您创建一个包含如下所示文档的索引:
post test/_doc/1?refresh { "text": "words words", "flag": "foo" }
但你不喜欢flag
,并希望用tag
替换它。可以如下创建另一个索引:
post _reindex { "source": { "index": "test" }, "dest": { "index": "test2" }, "script": { "source": "ctx._source.tag = ctx._source.remove(\"flag\")" } }
查询新的文档:
get test2/_doc/1
响应结果:
{ "found": true, "_id": "1", "_index": "test2", "_type": "_doc", "_version": 1, "_seq_no": 44, "_primary_term": 1, "_source": { "text": "words words", "tag": "foo" } }
9.8 切片 (slicing)
reindex 支持 来使 reindex 操作并行进行。这能提高效率并且提供了一种将请求分解为较小的部分的便捷方式。
9.8.1 手动切片 (manually slicing)
通过为每个请求提供切片 id 和切片总数,手动将 _reindex 操作进行分解:
post _reindex { "source": { "index": "twitter", "slice": { "id": 0, "max": 2 } }, "dest": { "index": "new_twitter" } } post _reindex { "source": { "index": "twitter", "slice": { "id": 1, "max": 2 } }, "dest": { "index": "new_twitter" } }
你可以这样验证上述 api 的结果:
get _refresh post new_twitter/_search?size=0&filter_path=hits.total
返回如下的的结果:
{ "hits": { "total": 120 } }
9.8.2 自动切片 (automatic slicing)
也可以让 _reindex
自动并行地。使用 slices
指定要使用的切片数:
post _reindex?slices=5&refresh { "source": { "index": "twitter" }, "dest": { "index": "new_twitter" } }
您可以通过下列语句验证运行结果:
post new_twitter/_search?size=0&filter_path=hits.total
返回如下的的结果:
{ "hits": { "total": 120 } }
slices 设置为 auto 将允许 elasticsearch 选择要使用的切片数。此设置将使用一个分片一个切片,直至达到某个限制。如果存在多个源索引,它将根据具有最少分片的那个索引所拥有的分片数来作为切片数。
向_reindex
添加 slices
只会自动执行上一节中使用的手动过程,这意味着它有一些怪癖:
- 您可以在 tasks api 中查看这些请求。这些子请求是具有
slices
请求的任务的 “子 " 任务。 - 仅使用
slices
获取请求的任务状态 (包含已完成切片的状态)。 - 这些子请求可单独寻址,例如取消和重新限制。
- 使用
slices
重新处理请求将按比例重新调整未完成的子请求。 - 使用
slices
取消请求将取消每个子请求。 - 由于
slices
的性质,每个子请求都不会获得完全均匀的文档部分。这些切片文档都会被处理,但某些切片可能比其他切片分到更大的文档。 - 像
requests_per_second
这样的参数和带有size
的slices
请求 (按指定比例分配给每个子请求)。将其与上述关于分布不均匀的点相结合,您应该得出结论,使用slices
的size
可能不会删除指定大小的文档。 - 每个子请求都会获得和源索引略有不同的快照,尽管这些快照几乎同时进行。
9.8.3 选择 slices 的数量(picking the number of slices)
如果 slices 设置为 auto,elasticsearch 将会自动为大多数索引选择一个合理的数量。如果您设置手动切片或以其他方式来调整自动切片,请遵循以下准则:
当切片的数量等于索引的分片数时,查询性能最好。如果这个数量太大(如 500), 请选择一个较小的数字,因为太多的切片会影响性能。设置高于分片数的切片通常不会提高效率反而会增加开销。
indexing 的性能与可用的切片数量呈正相关。
查询或索引是否是影响此时运行时性能的主要原因,这取决于reindexed时的文档和集群的资源。
9.9 重建更多的索引( reindexing many indices)
如果你有很多索引要重新索引,通常最好一次重新索引它们,而不是使用全局模式来获取许多索引。这样的话如果reindex时有任何错误,您可以通过删除部分已完成的索引,然后让该索引重新执行一次reindex,从而恢复之前的流程。它还使得流程的并行化非常简单:将索引列表拆分为reindex并并行运行每个列表。
one-off bash脚本似乎很适合这种操作:
for index in i1 i2 i3 i4 i5; do curl -hcontent-type:application/json -xpost localhost:9200/_reindex?pretty -d'{ "source": { "index": "'$index'" }, "dest": { "index": "'$index'-reindexed" } }' done
9.10 每日重复指数( reindex daily indices)
尽管有上述建议,您可以将_reindex与painless结合使用新模板应用于现有文档以实现每日重建索引。
假设您的索引由以下文档组成:
put metricbeat-2016.05.30/_doc/1?refresh {"system.cpu.idle.pct": 0.908} put metricbeat-2016.05.31/_doc/1?refresh {"system.cpu.idle.pct": 0.105}
metricbement- *索引的新模板已加载到elasticsearch中,但它仅适用于新创建的索引。painless 可用于reindex 已经存在的文档并应用于新模板。
下面的脚本从索引名称中提取日期,并创建一个在原索引上附加了-1的新的索引。下例将metricbeat-2016.05.31
索引中的所有数据都重新编入metricbeat-2016.05.31-1
索引中。
post _reindex { "source": { "index": "metricbeat-*" }, "dest": { "index": "metricbeat" }, "script": { "lang": "painless", "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'" } }
现在可以在* -1
索引中找到先前metricbeat
索引中的所有文档:
get metricbeat-2016.05.30-1/_doc/1 get metricbeat-2016.05.31-1/_doc/1
以前的方法还可以与结合使用,以仅将现有数据加载到新索引中,并根据需要重命名任何字段。
9.11 提取索引的随机子集(extracting a random subset of an index)
_reindex可用于提取索引的随机子集以进行测试:
post _reindex { "size": 10, "source": { "index": "twitter", "query": { "function_score" : { "query" : { "match_all": {} }, "random_score" : {} } }, "sort": "_score" 【1】 }, "dest": { "index": "random_twitter" } }
【1】:_reindex
默认按_doc
排序,除非您将排序覆盖为_score
,否则random_score将不起作用。
上一篇: docker安装ros2详细步骤介绍
推荐阅读
-
elasticsearch6.7 05. Document APIs(5)Delete By Query API
-
elasticsearch6.7 05. Document APIs(10)Reindex API
-
elasticsearch6.7 05. Document APIs(6)UPDATE API
-
elasticsearch6.7 05. Document APIs(7)Update By Query API
-
elasticsearch6.7 05. Document APIs(9)Bulk API
-
elasticsearch6.7 05. Document APIs(8)Multi Get API
-
elasticsearch6.7 05. Document APIs(6)UPDATE API
-
elasticsearch6.7 05. Document APIs(7)Update By Query API
-
elasticsearch6.7 05. Document APIs(10)Reindex API
-
elasticsearch6.7 05. Document APIs(9)Bulk API