欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

elasticsearch6.7 05. Document APIs(10)Reindex API

程序员文章站 2022-06-25 16:26:33
9、REINDEX API Reindex要求为源索引中的所有文档启用_source。 不会配置目标索引,不会复制源索引的设置。你需要在reindex之前先指定 ,分片数量,副本数量等选项。 最常用的一种方式是复制一个索引。下例会将twitter索引中的文档复制到new_twitter索引中: 返回 ......

9、reindex api

reindex要求为源索引中的所有文档启用_source。

reindex 不会配置目标索引,不会复制源索引的设置。你需要在reindex之前先指定mapping,分片数量,副本数量等选项。

_reindex最常用的一种方式是复制一个索引。下例会将twitter索引中的文档复制到new_twitter索引中:

post _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

返回结果:

{
  "took" : 147,
  "timed_out": false,
  "created": 120,
  "updated": 0,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 120,
  "failures" : [ ]
}

_update_by_query api 一样,_reindex从原索引获取快照,但它的目标索引必须是不同的索引,所以不太可能发生版本冲突。dest元素可以向 index api 一样配置,控制乐观锁。例如,省略version_type(和上述一样)或者将其设置为internal,会导致elasticsearch草率地将文档转储到目标索引,从而恰巧覆盖具有相同type和id的文档:

post _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "internal"
  }
}

设置version_typeexternal会使elasticsearch保存源文档的version,创建任何不存在目标索引的文档,并更新任何源文档version比目标文档version要大的文档:

post _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  }
}

设置op_typecreate 会导致 _reindex仅在目标索引中创建缺少的文档。索引已存在的文档都会导致版本冲突:

post _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}

默认的版本冲突会导致_reindex 进程终止,你可以在请求体中设置"conflicts": "proceed"来指示_reindex继续处理版本冲突之后的文档。请务必注意,其他错误类型的处理不受“conflict”参数的影响。当请求体中设置"conflicts": "proceed"时,_reindex进程将继续发生版本冲突并返回遇到的版本冲突数:

post _reindex
{
  "conflicts": "proceed",
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}

你可以通过在source字段内添加type字段或query字段来限制文档。如下例仅会将索引twitter中的kimchy复制到索引new_twitter中:

post _reindex
{
  "source": {
    "index": "twitter",
    "type": "_doc",
    "query": {
      "term": {
        "user": "kimchy"
      }
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}

可以在source字段中同时列出typeindex,允许在一个请求中复制大量数据。下例会复制来自twitter索引和blog索引的_doc 类型和post类型的文档:

post _reindex
{
  "source": {
    "index": ["twitter", "blog"],
    "type": ["_doc", "post"]
  },
  "dest": {
    "index": "all_together",
    "type": "_doc"
  }
}

reindex api 不会处理id冲突,以最后写入的文档为准,但reindex的顺序通常是不可预测,因此依赖此行为并不是一个好主意。应该使用脚本确保id是唯一的。

也可以限制复制文档的数量。下例仅仅从twitter索引复制一个文档到new_twitter索引:

post _reindex
{
  "size": 1,
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

你可以指定设置sort参数指定排序规则。sort会降低scroll的效率,但是有时候必须使用。如果可以的话,最好限制size的大小。下例会从twitter索引中复制10000个文档到new_twitter索引:

post _reindex
{
  "size": 10000,
  "source": {
    "index": "twitter",
    "sort": { "date": "desc" }
  },
  "dest": {
    "index": "new_twitter"
  }
}

可以使用_source参数过滤字段,例如,可以使用sourcetwitter索引中过滤一部分字段,如下所示:

post _reindex
{
  "source": {
    "index": "twitter",
    "_source": ["user", "_doc"]
  },
  "dest": {
    "index": "new_twitter"
  }
}

_redix_update_by_query一样也支持脚本,它可以修改一个文档,但和_update_by_query有区别,这个script仅允许修改文档的metadata

post _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  },
  "script": {
    "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
    "lang": "painless"
  }
}

_update_by_query一样,你可以设置ctx.op改变在目标索引执行的操作:

  • noop
    • 设置 ctx.op = "noop",如果你的script确定文档不需要索引到目标索引。这个空操作将会反映到响应体的noop计数器。
  • delete
    • 设置ctx.op = "delete",如果你的script确定必须要从目标索引中删除文档。删除操作会反映到响应体的deleted计数器。

设置ctx.op为其他值会导致错误。

你甚至可以改变如下的元数据,但一定要小心地操作:

  • _id
  • _type
  • _index
  • _version
  • _routing
  • _parent

设置_version =null或者从ctx中清空它,等价于没有在index 请求中发送version参数。不管目标索引文档的版本或者你在_reindex请求中指定的版本类型是什么,它都会导致目标索引的文档被覆盖。

默认情况下,如果_reindex查询到一个指定了routing参数的文档,在新文档中也会保留该参数,除非你在script中改变它。你可以在dest字段设置routing从而改变这个设置:

  • keep
    • 文档的路由值在新索引中保持不变。这是默认值。
  • discard
    • 原索引中文档的routing值在新索引中变为null
  • =<some text>
    • 文档的路由值在新索引中变为指定值。

例如,你可以使用以下请求从源索引中复制所有公司名是cat的文档到目标索引,并且指定它们的routing值都是cat:

post _reindex
{
  "source": {
    "index": "source",
    "query": {
      "match": {
        "company": "cat"
      }
    }
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

默认,_reindex每次scroll查询的文档数是1000。你可以在source元素中使用size参数改变这个大小:

post _reindex
{
  "source": {
    "index": "source",
    "size": 100
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

你也可以指定pipeline参数使用ingest node节点特性:

post _reindex
{
  "source": {
    "index": "source"
  },
  "dest": {
    "index": "dest",
    "pipeline": "some_ingest_pipeline"
  }
}

9.1 从远程重建索引(reindex from remote)

reindex支持从远程的集群中获取源索引数据:

post _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "username": "user",
      "password": "pass"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

host参数必须包含协议,主机和端口(例如:)。usernamepassword是可选的,当elasticsearch节点需要basic auth时会使用它们。使用基本身份验证时务必使用https,否则密码将以纯文本格式发送。有一系列可用于配置https连接的行为。

远程主机必须要在elasticsearch.yaml使用reindex.remote.whitelist属性显式指定允许连接的白名单。它可以是一个逗号分隔的列表((e.g. otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*)。白名单忽略协议,仅仅使用端口和主机。例如:

reindex.remote.whitelist: "otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*"

必须将白名单配置在所有进行reindex的节点上。

这个特性可以和任何版本的远程集群交互。允许你从任何版本的elasticsearch升级到当前版本。

要将查询发送到旧版的elasticsearch,请将查询参数直接发送到远程主机。

从远程群集执行reindex 不支持手动或自动切片。

远程服务器使用一个最大值是100mb的缓冲区。如果远程索引包含非常大的文档,你需要使用一个更小的批量大小(每一批的数量不要超过缓冲区大小),下面的示例将批量大小设置为10:

post _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200"
    },
    "index": "source",
    "size": 10,
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

你也可以使用socket_timeout字段指定请求的套接字进行读取操作的超时时间,connect_timeout字段指定连接超时时间。默认都是30s。下例将套接字读取超时设置为一分钟,将连接超时设置为10秒:

post _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "socket_timeout": "1m",
      "connect_timeout": "10s"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

9.2 url参数(url parameters)

参考上一节

9.3 配置ssl参数(configuring ssl parameters)

9.4 响应体(response body)

json响应如下:

{
  "took": 639,
  "timed_out": false,
  "total": 5,
  "updated": 0,
  "created": 5,
  "deleted": 0,
  "batches": 1,
  "noops": 0,
  "version_conflicts": 2,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": 1,
  "throttled_until_millis": 0,
  "failures": [ ]
}
  • took
    • 整个操作耗费的毫秒数
  • timed_out
    • 如果在执行 reindex 操作时出现超时,那么这个标识将会返回 true
  • total
    • 成功执行操作的文档的数量
  • updated
    • 成功的更新了多少个文档
  • ceated
    • 成功的创建了多少个文档
  • deleted
    • 成功的删除了多少个文档
  • batches
    • 回滚数
  • verison_conflicts
    • 操作过程中出现版本冲突的数量
  • noops
    • 由于 ctx.op=noop 设置造成的忽略的文档数
  • retries
    • 重复尝试的次数,bulk 是批量更新操作重复尝试的次数,search 是查询的重复尝试次数
  • throthled_millis
    • requests_per_second 参数引起的请求等待时间
  • requests_per_second
    • 在操作过程中,每秒执行的请求数
  • throttled_until_millis
    • 执行reindex时这个值始终0,只在在调用task api时该值才有意义,它表示下一次(自纪元以来)为了符合requests_per_second将再次执行请求的毫秒数。
  • failures
    • 执行失败的数组,包含在执行过程中任何不可恢复的错误。如果这个数组不是空的,那么请求会因为这些失败而中止。reindex 是使用批处理实现的,任何失败都会导致整个执行被中止。可以使用conflicts参数来防止reindex在版本冲突时造成操作中止。

9.5 结合 taskapi 使用(works with the task api)

您可以使用 task api 获取任何正在进行 update_by_query 请求的状态:

get _tasks?detailed=true&actions=*reindex 

返回值:

{
  "nodes" : {
    "r1a2worbtwkz516z6nes5a" : {
      "name" : "r1a2wor",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1a2worbtwkz516z6nes5a:36619" : {
          "node" : "r1a2worbtwkz516z6nes5a",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/reindex",
          "status" : {    
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            },
            "throttled_millis": 0,
            "requests_per_second": -1,
            "throttled_until_millis": 0
          },
          "description" : "",
          "start_time_in_millis": 1535149899665,
          "running_time_in_nanos": 5926916792,
          "cancellable": true,
          "headers": {}
        }
      }
    }
  }
}

status:这个对象包含了当前任务的实际状态。total 字段是本次操作需要重新索引的文档数。你可以通过 updated, created, and deleted 字段估计处理进度。当以上几个字段的和等于 total 字段时,请求就执行完毕了。

你可以使用 task id 查看某个任务。下例查看task id为 r1a2worbtwkz516z6nes5a:36619的任务信息:

get /_tasks/r1a2worbtwkz516z6nes5a:36619

该 api 可以与wait_for_comletion=false集成使用,可以清晰的查看已完成任务的状态。如果任务已经完成,并且在其上设置了wait_for_completion=false,那么请求将会返回结果或是错误字段。此功能的代价是当wait_for_completion=false时会在.tasks/task/${taskid}目录下会创建文档。您可以根据需要删除该文档。

9.6 取消任务(works with the cancel task api)

任何_update_by_query操作都可以通过task cancel api来取消,如:

post _tasks/r1a2worbtwkz516z6nes5a:36619/_cancel

取消应该执行很快,但可能需要几秒钟。在此期间上面的 task status api将继续列出该任务,直到它完全被取消了。

9.7 阈值(rethrottling)

在正在执行的请求中,requests_per_second的值可以在运行时通过_rethrotted api进行修改:

post _update_by_query/r1a2worbtwkz516z6nes5a:36619/_rethrottle?requests_per_second=-1

可以使用tasks api找到任务id。

和 requests_per_seconds 参数设置一样,rethrottling 参数可以是 - 1 (禁用限制)或是其他十进制数(如 1.7 或 12 )。rethrottling 参数能提高查询速度且会立即生效,但是降低速度必须等到当前操作执行完后才起作用。这可以防止滚动超时

9.7.1 重建索引以更改字段的名称

_reindex可用于构建具有重命名字段的索引副本。假设您创建一个包含如下所示文档的索引:

post test/_doc/1?refresh
{
  "text": "words words",
  "flag": "foo"
}

但你不喜欢flag ,并希望用tag替换它。可以如下创建另一个索引:

post _reindex
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "test2"
  },
  "script": {
    "source": "ctx._source.tag = ctx._source.remove(\"flag\")"
  }
}

查询新的文档:

get test2/_doc/1

响应结果:

{
  "found": true,
  "_id": "1",
  "_index": "test2",
  "_type": "_doc",
  "_version": 1,
  "_seq_no": 44,
  "_primary_term": 1,
  "_source": {
    "text": "words words",
    "tag": "foo"
  }
}

9.8 切片 (slicing)

reindex 支持 来使 reindex 操作并行进行。这能提高效率并且提供了一种将请求分解为较小的部分的便捷方式。

9.8.1 手动切片 (manually slicing)

通过为每个请求提供切片 id 和切片总数,手动将 _reindex 操作进行分解:

post _reindex
{
  "source": {
    "index": "twitter",
    "slice": {
      "id": 0,
      "max": 2
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}
post _reindex
{
  "source": {
    "index": "twitter",
    "slice": {
      "id": 1,
      "max": 2
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}

你可以这样验证上述 api 的结果:

get _refresh
post new_twitter/_search?size=0&filter_path=hits.total

返回如下的的结果:

{
  "hits": {
    "total": 120
  }
}

9.8.2 自动切片 (automatic slicing)

也可以让 _reindex 自动并行地。使用 slices 指定要使用的切片数:

post _reindex?slices=5&refresh
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

您可以通过下列语句验证运行结果:

post new_twitter/_search?size=0&filter_path=hits.total

返回如下的的结果:

{
  "hits": {
    "total": 120
  }
}

slices 设置为 auto 将允许 elasticsearch 选择要使用的切片数。此设置将使用一个分片一个切片,直至达到某个限制。如果存在多个源索引,它将根据具有最少分片的那个索引所拥有的分片数来作为切片数。

_reindex 添加 slices 只会自动执行上一节中使用的手动过程,这意味着它有一些怪癖:

  • 您可以在 tasks api 中查看这些请求。这些子请求是具有 slices 请求的任务的 “子 " 任务。
  • 仅使用 slices 获取请求的任务状态 (包含已完成切片的状态)。
  • 这些子请求可单独寻址,例如取消和重新限制。
  • 使用 slices 重新处理请求将按比例重新调整未完成的子请求。
  • 使用 slices 取消请求将取消每个子请求。
  • 由于 slices 的性质,每个子请求都不会获得完全均匀的文档部分。这些切片文档都会被处理,但某些切片可能比其他切片分到更大的文档。
  • requests_per_second 这样的参数和带有 sizeslices 请求 (按指定比例分配给每个子请求)。将其与上述关于分布不均匀的点相结合,您应该得出结论,使用 slicessize 可能不会删除指定大小的文档。
  • 每个子请求都会获得和源索引略有不同的快照,尽管这些快照几乎同时进行。

9.8.3 选择 slices 的数量(picking the number of slices)

如果 slices 设置为 auto,elasticsearch 将会自动为大多数索引选择一个合理的数量。如果您设置手动切片或以其他方式来调整自动切片,请遵循以下准则:

当切片的数量等于索引的分片数时,查询性能最好。如果这个数量太大(如 500), 请选择一个较小的数字,因为太多的切片会影响性能。设置高于分片数的切片通常不会提高效率反而会增加开销。

indexing 的性能与可用的切片数量呈正相关。

查询或索引是否是影响此时运行时性能的主要原因,这取决于reindexed时的文档和集群的资源。

9.9 重建更多的索引( reindexing many indices)

如果你有很多索引要重新索引,通常最好一次重新索引它们,而不是使用全局模式来获取许多索引。这样的话如果reindex时有任何错误,您可以通过删除部分已完成的索引,然后让该索引重新执行一次reindex,从而恢复之前的流程。它还使得流程的并行化非常简单:将索引列表拆分为reindex并并行运行每个列表。

one-off bash脚本似乎很适合这种操作:

for index in i1 i2 i3 i4 i5; do
  curl -hcontent-type:application/json -xpost localhost:9200/_reindex?pretty -d'{
    "source": {
      "index": "'$index'"
    },
    "dest": {
      "index": "'$index'-reindexed"
    }
  }'
done

9.10 每日重复指数( reindex daily indices)

尽管有上述建议,您可以将_reindex与painless结合使用新模板应用于现有文档以实现每日重建索引。

假设您的索引由以下文档组成:

put metricbeat-2016.05.30/_doc/1?refresh
{"system.cpu.idle.pct": 0.908}
put metricbeat-2016.05.31/_doc/1?refresh
{"system.cpu.idle.pct": 0.105}

metricbement- *索引的新模板已加载到elasticsearch中,但它仅适用于新创建的索引。painless 可用于reindex 已经存在的文档并应用于新模板。

下面的脚本从索引名称中提取日期,并创建一个在原索引上附加了-1的新的索引。下例将metricbeat-2016.05.31索引中的所有数据都重新编入metricbeat-2016.05.31-1索引中。

post _reindex
{
  "source": {
    "index": "metricbeat-*"
  },
  "dest": {
    "index": "metricbeat"
  },
  "script": {
    "lang": "painless",
    "source": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
  }
}

现在可以在* -1索引中找到先前metricbeat索引中的所有文档:

get metricbeat-2016.05.30-1/_doc/1
get metricbeat-2016.05.31-1/_doc/1

以前的方法还可以与结合使用,以仅将现有数据加载到新索引中,并根据需要重命名任何字段。

9.11 提取索引的随机子集(extracting a random subset of an index)

_reindex可用于提取索引的随机子集以进行测试:

post _reindex
{
  "size": 10,
  "source": {
    "index": "twitter",
    "query": {
      "function_score" : {
        "query" : { "match_all": {} },
        "random_score" : {}
      }
    },
    "sort": "_score"    【1】
  },
  "dest": {
    "index": "random_twitter"
  }
}

【1】:_reindex默认按_doc排序,除非您将排序覆盖为_score,否则random_score将不起作用。