欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Elasticsearch 之(7)并发冲突解决: 剖析悲观锁与乐观锁的并发控制方案

程序员文章站 2022-05-04 15:09:04
...

并发冲突问题

Elasticsearch 之(7)并发冲突解决: 剖析悲观锁与乐观锁的并发控制方案

剖析悲观锁与乐观锁两种并发控制方案

Elasticsearch 之(7)并发冲突解决: 剖析悲观锁与乐观锁的并发控制方案基于_version进行乐观锁并发控制

(1)_version元数据
PUT /test_index/test_type/6
{
  "test_field": "test test"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "6",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
第一次创建一个document的时候,它的_version内部版本号就是1;以后,每次对这个document执行修改或者删除操作,都会对这个_version版本号自动加1;哪怕是删除,也会对这条数据的版本号加1
{
  "found": true,
  "_index": "test_index",
  "_type": "test_type",
  "_id": "6",
  "_version": 4,
  "result": "deleted",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
我们会发现,在删除一个document之后,可以从一个侧面证明,它不是立即物理删除掉的,因为它的一些版本号等信息还是保留着的。先删除一条document,再重新创建这条document,其实会在delete version基础之上,再把version号加1
Elasticsearch 之(7)并发冲突解决: 剖析悲观锁与乐观锁的并发控制方案
(1)先构造一条数据出来

PUT /test_index/test_type/7
{
  "test_field": "test test"
}
(2)模拟两个客户端,都获取到了同一条数据
GET test_index/test_type/7

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 1,
  "found": true,
  "_source": {
    "test_field": "test test"
  }
}
(3)其中一个客户端,先更新了一下这个数据

同时带上数据的版本号,确保说,es中的数据的版本号,跟客户端中的数据的版本号是相同的,才能修改
PUT /test_index/test_type/7?version=1 
{
  "test_field": "test client 1"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}
(4)另外一个客户端,尝试基于version=1的数据去进行修改,同样带上version版本号,进行乐观锁的并发控制
PUT /test_index/test_type/7?version=1 
{
  "test_field": "test client 2"
}

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
        "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
        "shard": "3",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
    "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
    "shard": "3",
    "index": "test_index"
  },
  "status": 409
}
(5)在乐观锁成功阻止并发问题之后,尝试正确的完成更新
GET /test_index/test_type/7

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 2,
  "found": true,
  "_source": {
    "test_field": "test client 1"
  }
}
基于最新的数据和版本号,去进行修改,修改后,带上最新的版本号,可能这个步骤会需要反复执行好几次,才能成功,特别是在多线程并发更新同一条数据很频繁的情况下
PUT /test_index/test_type/7?version=2 
{
  "test_field": "test client 2"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 3,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}


基于external version进行乐观锁并发控制

es提供了一个feature,就是说,你可以不用它提供的内部_version版本号来进行并发控制,可以基于你自己维护的一个版本号来进行并发控制。举个列子,加入你的数据在mysql里也有一份,然后你的应用系统本身就维护了一个版本号,无论是什么自己生成的,程序控制的。这个时候,你进行乐观锁并发控制的时候,可能并不是想要用es内部的_version来进行控制,而是用你自己维护的那个version来进行控制。
?version=1
?version=1&version_type=external
version_type=external,唯一的区别在于,_version,只有当你提供的version与es中的_version一模一样的时候,才可以进行修改,只要不一样,就报错;当version_type=external的时候,只有当你提供的version比es中的_version大的时候,才能完成修改

es,_version=1,?version=1,才能更新成功
es,_version=1,?version>1&version_type=external,才能成功,比如说?version=2&version_type=external

(1)先构造一条数据
PUT /test_index/test_type/8
{
  "test_field": "test"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
(2)模拟两个客户端同时查询到这条数据
GET /test_index/test_type/8

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 1,
  "found": true,
  "_source": {
    "test_field": "test"
  }
}
(3)第一个客户端先进行修改,此时客户端程序是在自己的数据库中获取到了这条数据的最新版本号,比如说是2
PUT /test_index/test_type/8?version=2&version_type=external
{
  "test_field": "test client 1"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}
(4)模拟第二个客户端,同时拿到了自己数据库中维护的那个版本号,也是2,同时基于version=2发起了修改
PUT /test_index/test_type/8?version=2&version_type=external
{
  "test_field": "test client 2"
}

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
        "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
        "shard": "1",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
    "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
    "shard": "1",
    "index": "test_index"
  },
  "status": 409
}
(5)在并发控制成功后,重新基于最新的版本号发起更新
GET /test_index/test_type/8

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 2,
  "found": true,
  "_source": {
    "test_field": "test client 1"
  }
}

PUT /test_index/test_type/8?version=3&version_type=external
{
  "test_field": "test client 2"
}

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 3,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}

悲观锁(全局锁
全局锁,一次性就锁整个index,对这个index的所有增删改操作都会被block住,如果上锁解锁的操作不是频繁,然后每次上锁之后,执行的操作的耗时不会太长,用这种方式,方便。
优点:操作非常简单,非常容易使用,成本低
缺点:你直接就把整个index给上锁了,这个时候对index中所有的doc的操作,都会被block住,导致整个系统的并发能力很低

PUT /fs/lock/global/_create
{}
fs: 你要上锁的那个index
lock: 就是你指定的一个对这个index上全局锁的一个type
global: 就是你上的全局锁对应的这个doc的id
_create:强制必须是创建,如果/fs/lock/global这个doc已经存在,那么创建失败,报错

利用了doc来进行上锁
{
  "_index": "fs",
  "_type": "lock",
  "_id": "global",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
另外一个线程同时尝试上锁
PUT /fs/lock/global/_create
{}

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][global]: version conflict, document already exists (current version [1])",
        "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
        "shard": "2",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][global]: version conflict, document already exists (current version [1])",
    "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
    "shard": "2",
    "index": "fs"
  },
  "status": 409
}
释放锁
DELETE /fs/lock/global

{
  "found": true,
  "_index": "fs",
  "_type": "lock",
  "_id": "global",
  "_version": 2,
  "result": "deleted",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}


悲观锁(document锁
细粒度的一个锁,document锁,顾名思义,每次就锁你要操作的,你要执行增删改的那些doc,doc锁了,其他线程就不能对这些doc执行增删改操作了
但是你只是锁了部分doc,其他线程对其他的doc还是可以上锁和执行增删改操作的
document锁,是用脚本进行上锁
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}
/fs/lock,是固定的,就是说fs下的lock type,专门用于进行上锁
/fs/lock/id,比如1,id其实就是你要上锁的那个doc的id,代表了某个doc数据对应的lock(也是一个doc)
_update + upsert:执行upsert操作

params,里面有个process_id,process_id,是你的要执行增删改操作的进程的唯一id,比如说可以在java系统,启动的时候,给你的每个线程都用UUID自动生成一个thread id,你的系统进程启动的时候给整个进程也分配一个UUID。process_id + thread_id就代表了某一个进程下的某个线程的唯一标识。可以自己用UUID生成一个唯一ID

process_id很重要,会在lock中,设置对对应的doc加锁的进程的id,这样其他进程过来的时候,才知道,这条数据已经被别人给锁了

assert false,不是当前进程加锁的话,则抛出异常
ctx.op='noop',不做任何修改

如果该document之前没有被锁,/fs/lock/1之前不存在,也就是doc id=1没有被别人上过锁; upsert的语法,那么执行index操作,创建一个/fs/lock/id这条数据,而且用params中的数据作为这个lock的数据。process_id被设置为123,script不执行。这个时候象征着process_id=123的进程已经锁了一个doc了。

如果document被锁了,就是说/fs/lock/1已经存在了,代表doc id=1已经被某个进程给锁了。那么执行update操作,script,此时会比对process_id,如果相同,就是说,某个进程,之前锁了这个doc,然后这次又过来,就可以直接对这个doc执行操作,说明是该进程之前锁的doc,则不报错,不执行任何操作,返回success; 如果process_id比对不上,说明doc被其他doc给锁了,此时报错。
/fs/lock/1
{
  "process_id": 123
}
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}
script:ctx._source.process_id,123
process_id:加锁的upsert请求中带过来额proess_id
scripts/judge-lock.groovy: if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 123
    }
  }
}
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
GET /fs/lock/1

{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "found": true,
  "_source": {
    "process_id": 123
  }
}
如果两个process_id相同,说明是一个进程先加锁,然后又过来尝试加锁,可能是要执行另外一个操作,此时就不会block,对同一个process_id是不会block,ctx.op= 'noop',什么都不做,返回一个success
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 123
    }
  }
}
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "result": "noop",
  "_shards": {
    "total": 0,
    "successful": 0,
    "failed": 0
  }
}    

如果说已经有一个进程加了锁了,process_id: 234。process_id不相等,说明这个doc之前已经被别人上锁了,process_id=123上锁了; process_id=234过来再次尝试上锁,失败,assert false,就会报错
POST /fs/lock/1/_update
{
  "upsert": { "process_id": 234 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 234
    }
  }
}
{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[4onsTYV][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "error evaluating judge-lock",
      "caused_by": {
        "type": "power_assertion_error",
        "reason": "assert false\n"
      },
      "script_stack": [],
      "script": "",
      "lang": "groovy"
    }
  },
  "status": 400
}
释放锁
POST /fs/_refresh  //刷新到内存中
PUT /fs/lock/_bulk
{ "delete": { "_id": 1}}
{
  "took": 20,
  "errors": false,
  "items": [
    {
      "delete": {
        "found": true,
        "_index": "fs",
        "_type": "lock",
        "_id": "1",
        "_version": 2,
        "result": "deleted",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    }
  ]
}

共享锁和排他锁
共享锁:这份数据是共享的,然后多个线程过来,都可以获取同一个数据的共享锁,然后对这个数据执行读操作
排他锁:是排他的操作,只能一个线程获取排他锁,然后执行增删改操作

读写锁的分离
如果只是要读取数据的话,那么任意个线程都可以同时进来然后读取数据,每个线程都可以上一个共享锁
但是这个时候,如果有线程要过来修改数据,那么会尝试上排他锁,排他锁会跟共享锁互斥,也就是说,如果有人已经上了共享锁了,那么排他锁就不能上,就得等


如果有人在读数据,就不允许别人来修改数据
反之,也是一样的

如果有人在修改数据,就是加了排他锁
那么其他线程过来要修改数据,也会尝试加排他锁,此时会失败,锁冲突,必须等待,同时只能有一个线程修改数据
如果有人过来同时要读取数据,那么会尝试加共享锁,此时会失败,因为共享锁和排他锁是冲突的

如果有在修改数据,就不允许别人来修改数据,也不允许别人来读取数据

1、有人在读数据,其他人也能过来读数据
judge-lock-2.groovy: if (ctx._source.lock_type == 'exclusive') { assert false }; ctx._source.lock_count++

POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
  	"lang": "groovy",
  	"file": "judge-lock-2"
  }
}

POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
  	"lang": "groovy",
  	"file": "judge-lock-2"
  }
}
GET /fs/lock/1

{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 3,
  "found": true,
  "_source": {
    "lock_type": "shared",
    "lock_count": 3
  }
}
就给大家模拟了,有人上了共享锁,你还是要上共享锁,直接上就行了,没问题,只是lock_count加1
2、已经有人上了共享锁,然后有人要上排他锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
排他锁用的不是upsert语法,create语法,要求lock必须不能存在,直接自己是第一个上锁的人,上的是排他锁
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][1]: version conflict, document already exists (current version [3])",
        "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
        "shard": "3",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][1]: version conflict, document already exists (current version [3])",
    "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
    "shard": "3",
    "index": "fs"
  },
  "status": 409
}
如果已经有人上了共享锁,明显/fs/lock/1是存在的,create语法去上排他锁,肯定会报错

3、对共享锁进行解锁
unlock-shared.groovy: if (--ctx._source.lock_count == 0) { ctx.op = 'delete' }; 

POST /fs/lock/1/_update
{
  "script": {
  	"lang": "groovy",
  	"file": "unlock-shared"
  }
}
之前上了3次共享锁,连续解锁3次,此时共享锁就彻底没了
每次解锁一个共享锁,就对lock_count先减1,如果减了1之后,是0,那么说明所有的共享锁都解锁完了,此时就就将/fs/lock/1删除,就彻底解锁所有的共享锁

4、上排他锁,再上排他锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
其他线程
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][1]: version conflict, document already exists (current version [7])",
        "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
        "shard": "3",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][1]: version conflict, document already exists (current version [7])",
    "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
    "shard": "3",
    "index": "fs"
  },
  "status": 409
}
5、上排他锁,上共享锁
POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
  	"lang": "groovy",
  	"file": "judge-lock-2"
  }
}
{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[4onsTYV][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "error evaluating judge-lock-2",
      "caused_by": {
        "type": "power_assertion_error",
        "reason": "assert false\n"
      },
      "script_stack": [],
      "script": "",
      "lang": "groovy"
    }
  },
  "status": 400
}
6、解锁排他锁
DELETE /fs/lock/1