ElasticSearch 索引 剖析
elasticsearch index 剖析
在看elasticsearch权威指南基础入门中关于:分片内部原理这一小节内容后,大致对elasticsearch的索引、搜索底层实现有了一个初步的认识。记录一下在看文档的过程中碰到的问题以及我的理解。此外,在文章的末尾,还讨论分布式系统中的主从复制原理,以及采用这种副本复制方案带来的数据一致性问题。
elasticsearch index 操作背后发生了什么?
更具体地,就是执行put操作向elasticsearch添加一篇文档时,底层发生的一系列操作。
put user/profile/10 { "content":"向user索引中添加一篇id为10的文档" }
通过put请求发起了索引新文档的操作,该操作能够执行的前提是:集群中有“一定数量”的活跃 shards。这个配置由wait_for_active_shards
指定。elasticsearch关于分片有2个重要的概念:primary shard 和 replica。在定义索引的时候指定索引有几个主分片,以及每个主分片有多少个副本。比如:
put user { "settings": { "number_of_shards": 3, "number_of_replicas": 2 },
介绍一下集群的环境:elasticsearch6.3.2三节点集群。定义了一个user索引,该索引有三个主分片,每个主分片2个副本。如图,每个节点上有三个shards:一个 primary shard,二个replica
wait_for_active_shards
to improve the resiliency of writes to the system, indexing operations can be configured to wait for a certain number of active shard copies before proceeding with the operation.
在索引一篇文档时,通过wait_for_active_shards
指定有多少个活跃的shards时,才能执行索引文档的操作。默认情况下,只要primary shard 是活跃的就可以索引文档。即wait_for_active_shards
值为1
by default, write operations only wait for the primary shards to be active before proceeding (i.e.
wait_for_active_shards=1
)
来验证一下:在只有一台节点的elasticsearch上:三个primary shard 全部分配在一台节点中,并且存在着未分配的replica
执行:
put user/profile/10 { "content":"向user索引中添加一篇id为10的文档" }
返回结果:
{ "_index": "user", "_type": "profile", "_id": "10", "_version": 1, "result": "created", "_shards": { "total": 3, "successful": 1, "failed": 0 }, "_seq_no": 0, "_primary_term": 1 }
在_shards 中,total 为3,说明该索引操作应该在3个(一个primary shard,二个replica)分片中执行成功;但是successful为1 说明 put操作 在其中一个分片中执行成功了,就返回给client索引成功的确认。这个分片就是primary shard,因为只有一台节点,另外2个replica 属于 unassigned shards,不可能在2个replica 中执行成功。总之,默认情况下,只要primary shard 是活跃的,就能索引文档(index操作)
现在在单节点的集群上,删除原来的user索引,并指定:wait_for_active_shards=2
,这意味着一个索引操作至少要在2个分片上执行成功,才能返回给client acknowledge。
"settings": { "index.write.wait_for_active_shards": "2" }
再次向user索引中put 一篇文档:
put user/profile/10 { "content":"向user索引中添加一篇id为10的文档" }
返回结果:
{ "statuscode": 504, "error": "gateway time-out", "message": "client request timeout" }
由于是单节点的elasticsearch,另外的2个replica无法分配,因此不可能是活跃的。而我们指定的wait_for_active_shards
为2,但现在只有1个primary shard是活跃的,因此无法进行索引操作了。
the primary shard assigned to perform the index operation might not be available when the index operation is executed. some reasons for this might be that the primary shard is currently recovering from a gateway or undergoing relocation. by default, the index operation will wait on the primary shard to become available for up to 1 minute before failing and responding with an error.
索引操作会在1分钟之后超时。
总结一下:由于文档最终是存在在某个elasticsearch shard下面的,而每个shard又设置了副本数。默认情况下,在进行索引文档操作时,elasticsearch会检查活跃的分片数量是否达到wait_for_active_shards
设置的值。若未达到,则索引操作会超时,超时时间为1分钟。另外,值得注意的是:检查活跃分片数量只是在开始索引数据的时候检查,若检查通过后,在索引文档的过程中,集群中又有分片因为某些原因挂掉了,那索引文档操作还是会继续进行。
因为索引文档操作(也即写操作)发生在 检查活跃分片数量 操作之后。试想以下几个问题:
- 问题1:检查活跃分片数量满足
wait_for_active_shards
设置的值之后,在持续 bulk index 文档过程中有 shard 失效了(这里的shard是replica),那 难道不能继续索引文档了? - 问题2:在什么时候检查集群中的活跃分片数量?难道要 每次client发送索引文档请求时就要检查一次吗?还是说周期性地隔多久检查一次?
- 问题3:这里的 check-then-act 并不是原子操作,因此
wait_for_active_shards
这个配置参数又有多大的意义?
因此,官方文档中是这么说的:
it is important to note that this setting greatly reduces the chances of the write operation not writing to the requisite number of shard copies, but it does not completely eliminate the possibility, because this check occurs before the write operation commences. once the write operation is underway, it is still possible for replication to fail on any number of shard copies but still succeed on the primary.
-
该参数只是尽可能地保证新文档能够写入到我们所要求的shard数量中(reduce the chance of ....)。比如:
wait_for_active_shards
设置为2,那也只是尽可能地保证将新文档写入了2个shard中,当然一个是primary shard,另外就是某一个replica - check 操作发生在 write操作之前,在写操作正在进行过程中,有可能某些shard出了问题,只要不是primary shard,那写操作还是会继续进行。
最后,说一下wait_for_active_shards参数的取值:可以设置成 all 或者是 1到 number_of_replicas+1 之间的任何一个整数。
valid values are
all
or any positive integer up to the total number of configured copies per shard in the index (which isnumber_of_replicas+1
)
number_of_replicas 为索引指定的副本的数量,加1是指:再算上primary shard。比如前面user索引的副本数量为2,那么wait_for_active_shards最多设置为3。
好,前面讨论完了elasticsearch能够执行索引操作(写操作)了,接下来是在写操作过程中发生了什么?比如说elasticsearch是如何做到近实时搜索的?在将文档写入elasticsearch时候发生了故障,那文档会不会丢失?
由于elasticsearch底层是lucene,在将一篇文档写入elasticsearch,并最终能被client查询到,涉及到以下几个概念:倒排索引、lucene段、提交点、translog、elasticsearch分片。这里概念都是参考《elasticsearch definitive guide》中相关的描述。
in elasticsearch, the most basic unit of storage of data is a shard. but, looking through the lucene lens makes things a bit different. here, each elasticsearch shard is a lucene index, and each lucene index consists of several lucene segments. a segment is an inverted index of the mapping of terms to the documents containing those terms.
它们之间的关系示意图如下:
一个elasticsearch 索引可由多个 primary shard组成,每个primary shard相当于一个lucene index;一个lucene index 由多个segment组成,每个segment是一个倒排索引结构表
从文档的角度来看:文章会被analyze(比如分词),然后放到倒排索引(posting list)中。倒排索引之于elasticsearch就相当于b+树之于mysql,是存储引擎底层的存储结构。
当文档写入elasticsearch时,文档首先被保存在内存索引缓存中(in-memeory indexing buffer)。而in-memory buffer是每隔1秒钟刷新一次,刷新成一个个的可搜索的段--下图中的绿色圆柱表示(segment),然后这些段是每隔30分钟同步到磁盘中持久化存储,段同步到磁盘的过程称为 提交 commit。
在这里涉及到了两个过程:① in-memory buffer中的文档被刷新成段;②段提交 同步到磁盘 持久化存储。
过程①默认是1秒钟1次,这也能理解为什么elasticsearch中还有段合并操作。另外elasticsearch提供了 refresh api 来控制过程①。refresh操作强制把in-memory buffer中的内容刷新成段。refresh示意图如下:
比如说,你可以在每次index一篇文档之后就调用一次refresh api,也即:每索引一篇文档就强制刷新生成一个段,这会导致系统中存在大量的小段,是有性能开销的。而我们所说的elasticsearch是提供了近实时搜索,指的是:文档的变化并不是立即对搜索可见,但会在一秒之后变为可见,一秒钟之后,我们写入的文档就可以被搜索到了。
对于过程②,就是将段刷新到磁盘中去,默认是每隔30分钟一次,这个刷新过程称为提交。如果还未来得及提交时,发生了故障,那岂不是会丢失大量的文档数据?这个时候,就引入了translog
每篇文档写入到in-memroy buffer中时,同时也会向 translog中写一条记录。in-memory buffer 每秒刷新一次,刷新后生成新段,in-memory被清空,文档可以被搜索。
而translog 默认是每5秒钟刷新一次到磁盘,或者是在每次请求(index、delete、update、bulk)之后就刷新到磁盘。每5秒钟刷新一次就是异步刷新,可以通过如下方式开启:
put /my_index/_settings { "index.translog.durability": "async", "index.translog.sync_interval": "5s" }
这种方式的话,还是有可能会丢失文档数据,比如client发起index操作之后,elasticsearch返回了200响应,但是由于translog要等5秒钟之后才刷新到磁盘,如果在5秒内系统宕机了,那么这几秒钟内写入的文档数据就丢失了。
而在每次请求操作(index、delete、update、bulk)执行后就刷新translog到磁盘,则是translog同步刷新,比如说:当client put一个文档:
put user/profile/10 { "content":"向user索引中添加一篇id为10的文档" }
在前面提到的三节点elasticsearch集群中,该user索引有三个primary shard,每个primary shard2个replica,那么translog需要在某个primary shard中刷新成功,并且在该primary shard的两个replica中也刷新成功,才会给client返回 200 put成功响应。这种方式就保证了,只要client接收到的响应是200,就意味着该文档一定是成功索引到elasticsearch中去了。因为translog是成功持久化到磁盘之后,再给client响应的,系统宕机后在下一次重启elasticsearch时,就会读取translog进行恢复。
by default, elasticsearch
fsync
s and commits the translog every 5 seconds ifindex.translog.durability
is set toasync
or if set torequest
(default) at the end of every , , , or request. more precisely, if set torequest
, elasticsearch will only report success of an index, delete, update, or bulk request to the client after the translog has been successfullyfsync
ed and committed on the primary and on every allocated replica.
这也是为什么,在我们关闭elasticsearch时最好进行一次flush操作,将段刷新到磁盘中。因为这样会清空translog,那么在重启elasticsearch就会很快(不需要恢复大量的translog了)
translog 也被用来提供实时 crud 。当你试着通过id查询、更新、删除一个文档,它会在尝试从相应的段中检索之前, 首先检查 translog 任何最近的变更。这意味着它总是能够实时地获取到文档的最新版本。
放一张总结性的图,如下:
有个问题是:为什么translog可以在每次请求之后刷新到磁盘?难道不会影响性能吗?相比于将 段(segment)刷新到磁盘,刷新translog的代价是要小得多的,因为translog是经过精心设计的数据结构,而段(segment)是倒排索引,我们无法做到每次将段刷新到磁盘;而translog相比于段要轻量级简单得多,因此通过translog机制来保证数据不丢失又不影响查询性能。
changes to lucene are only persisted to disk during a lucene commit, which is a relatively expensive operation and so cannot be performed after every index or delete operation. ...... all index and delete operations are written to the translog after being processed by the internal lucene index but before they are acknowledged. in the event of a crash, recent transactions that have been acknowledged but not yet included in the last lucene commit can instead be recovered from the translog when the shard recovers.
总结一下:
这里一共有三个地方有“刷新操作”:
-
in-memory buffer 刷新 生成segment
每秒一次,文档刷新成segment就可以被搜索到了,elasticsearch提供了refresh api 来控制这个过程
-
translog 刷新到磁盘
由
index.translog.durability
来设置,或者由index.translog.flush_threshold_size
来设置当translog达到一定大小之后刷新到磁盘(默认512mb) -
段(segment) 刷新到磁盘
每30分钟一次,elasticsearch提供了flush api 来控制这个过程。在段被刷新到磁盘(就是通常所说的commit操作)中时,也会清空刷新translog。
存在的一些问题
这个 和 这个 讨论了index.write.wait_for_active_shards
参数的来龙去脉。
以三节点elasticsearch6.3.2集群,索引设置为3个primary shard,每个primary shard 有2个replica 来讨论:
- client向其中一个节点发起index操作索引文档,这个写操作请求当然是发送到primary shard上,但是当client收到200响应时,该文档是否已经复制到另外2个replica上?
- client将一篇文档成功写入到elasticsearch了(收到了200响应),它能在replica所在的节点上 get 到这篇文档吗?client发起查询请求,又能查询到这篇文档吗?(注意:get 和 query 是不一样的)
- 前面提到,当 index 一篇文档时,primary shard 和2个replica 上的translog 要 都刷新 到磁盘,才返回 200 响应,那它是否与参数
index.write.wait_for_active_shards
默认值 矛盾?因为index.write.wait_for_active_shards
默认值为1,即:只要primary shard 是活跃的,就可以进行 index 操作。也就是说:当client收到200的index成功响应,此时primary shard 已经将文档 复制 到2个replica 上了吗?这两个 replica 已经将文档刷新成 segment了吗?还是说这两个 replica 仅仅只是 将索引该文档的 translog 刷新到磁盘上了?
elasticsearch副本复制方式讨论
elasticsearch索引是一个逻辑概念,囊括现实世界中的数据。比如 定义一个 user 索引存储所有的用户资料信息。索引由若干个primary shard组成,就相当于把用户资料信息 分开成 若干个部分存储,每个primary shard存储user索引中的一部分数据。为了保证数据可靠性不丢失,可以为每个primary shard配置副本(replica)。显然,primary shard 和它对应的replica 是不会存储在同一台机器(node)上的,因为如果该机器宕机了,那么primary shard 和 副本(replica) 都会丢失,那整个系统就丢失一部分数据了。
primary shard 和 replica 这种副本备份方案,称为主从备份。primary shard是主(single leader),replica 是 从 (multiple replica)。由于是分布式环境,可能存在多个client同时向elasticsearch发起索引文档的请求,这篇文档会根据 文档id 哈希到某个 primary shard,primary shard写入该文档 并分发给 replica 进行存储。由于采用了哈希,这也是为什么 在定义索引的时候,需要指定primary shard个数,并且 primary shard个数一经指定后不可修改的原因。因为primary shard个数一旦改变,哈希映射 结果就变了。而采用这种主从副本备份方案,这也是为什么 索引操作(写操作、update操作) 只能由 primary shard处理,而读操作既可以从 primary shard读取,也可以从 replica 读取的原因。相对于文档而言,primary shard是single leader,所有的文档修改操作都统一由primary shard处理,能避免一些 并发修改 冲突。但是默认情况下,elasticsearch 副本复制方式 是异步的,也正如前面 index.write.wait_for_active_shards
讨论,只要primary shard 是活跃的就可以进行索引操作,primary shard 将文档 “ 存储 ” 之后,就返回给client 响应,然后primary shard 再将该文档同步给replicas,而这就是异步副本复制方式。在elasticsearch官方讨论论坛里面,也有关于副本复制方式的讨论:提出了一个问题:client向primary shard写入文档成功,primary shard 是通过何种方式将该文档同步到 replica的?
采用异步副本复制方式带来的一个问题是:读操作能读取最新写入的文档吗?如果我们指定读请求去读primary shard(通过elasticsearch 的路由机制),那么是能读到最新数据的。但是如果读请求是由某个 replica 接收处理,那也许就不能读取到刚才最新写入的文档了。因此,从刚才client 读请求的角度来看,elasticsearch能提供 哪种程度的 一致性呢?而出现这种一致性问题的原因在于:为了保证数据可靠性,采用了副本备份,引入了副本,导致副本和primary shard上的数据不一致,即:存在 replication lag 问题。由于这种副本复制延迟带来的问题,系统需要给client 某种数据一致性的 保证,比如说:
-
read your own write
client能够读取到它自己最新写入的数据。比如用户修改了昵称,那ta访问自己的主页时,能看到自己修改了的昵称,但是ta的好友 可能 并不能立即看到 ta 修改后的昵称。好友请求的是某个 replica 上的数据,而 primary shard还未来得及把刚才修改的昵称 同步 到 replica上。
-
monotonic reads
单调读。每次client读取的值,是越来越新的值(站在client角度来看的)。比如说nba篮球比赛,client每10分钟读一次比赛结果。第10分钟读取到的是 1:1,第20分钟读到的是2:2,第30分钟读到的是3:3,假设在第40分钟时,实际比赛结果是4:4,cleint在第40分钟读取的时候,读到的值可以是3:3 这意味着未读取到最新结果而已,读到的值也可以是4:4, 但是不能是2:2 。
-
consistent prefix reads
符合因果关系的一种读操作。比如说,用户1 和 用户2 对话:
用户1:你现在干嘛?
用户2:写代码对于client读:应该是先读取到“你现在干嘛?”,然后再读取到 “写代码。如果读取结果顺序乱了,client就会莫名其妙。
正是由于client 有了系统给予的这种 一致性 保证,那么client(或者说应用程序)就能基于这种保证 来开发功能,为用户提供服务。
那系统又是如何提供这种一致性保证的呢?或者说elasticsearch集群又提供了何种一致性保证?经常听到的有:强一致性(linearizability)、弱一致性、最终一致性。对于强一致性,通俗的理解就是:实际上数据有多份(primary shard 以及多个 replica),但在client看来,表现得就只有一份数据。在多个 client 并发读写情形下,某个client在修改数据a,而又有多个client在同时读数据a,linearizability 就要保证:如果某个client读取到了数据a,那在该client之后的读取请求返回的结果都不能比数据a要 旧,至少是数据a的当前值(不能是数据a的旧值)。不说了,再说,我自己都不明白了。
至于系统如何提供这种一致性,会用到一些分布式共识算法,我也没有深入地去研究过。
参考资料
elasticsearch关于translog的官方文档解释:
- elasticsearch权威指南中文版 基础入门小节中的:分片内部原理(再结合英文原文阅读一遍):
原文:
上一篇: sql server 计算两个时间 相差的 几天几时几分几秒
下一篇: linux开机步骤