如何处理下面的并发问题
工作中存在如下表:
表table_2,其中存在unique key(device, seq)
为唯一索引。 其中seq字段是我们程序中维护的一个自增序列。
业务的需求就是:每次推送一条消息,会根据device获取表中最大的seq。select seq from table_2 where device = ? order by seq desc
然后将获取的 seq+1 插入到table_2中作为当前消息的记录insert into table_2 (device, seq) values(?, ?)
2 . 贴上相关代码。代码是我简化的结果,方便大家阅读
$device = "x-x";
$this->_db->startTrans();
//从table_2表中读取该device最大的seq,
//然后将该seq+1之后,重新将该device和seq插入的数据表中。
//现在的问题是当并发的时候,并发的select语句获取到了相同的seq,
//所以insert的时候冲突发生了。
$result = $this->_db->getRow(
“SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
$this->_db->execute(
"INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();
3. 贴上报错信息
失败方法1:
不合理的原因是:直接加上异常处理机制。当冲突时,并发的请求因为抛出异常,直接被捕获,程序继续运行,不做任何处理。结果就是:导致客户端请求失败,客户端需要重新发起请求。
try{
//代码放这里
} catch(Exception $e){
if ($e->getCode() == 1062){
//
}
}
失败方法2
尝试重试机制.当请求第一次失败时,会多次尝试请求。当数据库唯一索引冲突的时候,catch捕获该异常,然后重试。重试的条件必须是:捕获的异常是数据库插入冲突,返回1062异常code。
for($i=0; $i code() == 1062){
continue;
}
throw $e;
}
失败方法3
通过redis来控制。因为冲突的根源就是于并发导致多个请求读取到了相同的seq导致的。所以考虑使用redis‘锁’的机制来实现。第一个获取到seq的请求会设置一个key,之后的请求都会在这个key的基础上进行获取。
$device = "";
$result = $this->_db->getRow(
“SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
//因为并发其实就是因为上面的sql语句查到了相同的seq,所以这里
//就只获取第一条执行的seq,之后的全部通过对该redis加1操作。使用setnx和incr是为了保证原子操作。
//这个处理方式仍然存在问题,比如在键值过期的时刻存在并发,这个时候$Seq就可能从1开始。
$lock = $redis->setnx($device, $Seq)
$redis->expire($device, 2);
if (!$lock){
$Seq = $redis->incr($device);
}
$this->_db->execute(
"INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();
失败方式4
另一种重试机制。上面的重试机制是靠MySQL数据库的插入冲突,现在的重试是通过Redis在Select语句层面实现。
$device = "";
for($i=0;$i_db->getRow(
“SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
$lock = $redis->setnx($device . $Seq, rand());
if ($lock){
$success = true;
$redis->expire($device . $Seq, 2);
break;
}
}
if (!$success) {
throw new Exception();
}
$this->_db->execute(
"INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();
}
失败方法5
使用事务。使用MySQL机制的事务来实现。首先在Select语句中加上意向排他锁IX, 修改之后的SQL为“SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1 FOR UPDATE”
, 后面的语句不变。官方文档是这样介绍的:两个意向排他说并没有冲突,所以并发请求A和并发请求B同时会获取IX,insert操作需要一个X锁。
X | IX | S | IS | |
---|---|---|---|---|
X | Conflict | Conflict | Conflict | Conflict |
IX | Conflict | Compatible | Conflict | Compatible |
S | Conflict | Conflict | Compatible | Compatible |
IS | Conflict | Compatible | Compatible | Compatible |
我的猜测:A请求在获取X锁的时候,B请求也在获取X锁。导致A请求在他遍历的行上加锁,B请求也同时在加锁,造成了循环等待锁释放的情况,产生死锁。
//前一句使用select for update
//然后和insert包裹起来。多进程执行的话,会出现死锁。
//通过降低mysql的隔离水平确实可以实现,但是可能会产生不可重复读的问题.通俗的介绍就是:A请求在操作事务中两次读到的数据可能不一致。
失败方法6
将两条sql放在一条sql里。INSERT INTO table_2(device, seq) VALUES(?, (select seq + 1 from table_2) where device = ? )", array($device)
//因为前一条sql是我简化版,其实他是一个分表的查询操作。
//这里的分表是一个router操作,所以不适合进行一条sql语句
//就是会依次查询我们这里分的几个表,最近的一个表查不到,查第二个
方法7
修改数据表结构,将seq保存到独立的一个其他表中,每个device对应一个最大的seq。这样在select的时候就可以保证select for update 不会产生间隙锁,而是指在一条数据行上加锁。
贴上相关截图
图片就如上所示。大部分的代码都是自己手工敲的,所以有些地方大家不用深究,了解清楚意思就可以。
希望大家可以有建设性建议
大神来指点指点吧
我实在是想不出什么好的办法了。将并发请求转化为串行,这个想法没有很好的实现。大神在哪里?
回复内容:
1. 描述你的问题
工作中存在如下表:
表table_2,其中存在unique key(device, seq)
为唯一索引。 其中seq字段是我们程序中维护的一个自增序列。
业务的需求就是:每次推送一条消息,会根据device获取表中最大的seq。select seq from table_2 where device = ? order by seq desc
然后将获取的 seq+1 插入到table_2中作为当前消息的记录insert into table_2 (device, seq) values(?, ?)
2 . 贴上相关代码。代码是我简化的结果,方便大家阅读
$device = "x-x";
$this->_db->startTrans();
//从table_2表中读取该device最大的seq,
//然后将该seq+1之后,重新将该device和seq插入的数据表中。
//现在的问题是当并发的时候,并发的select语句获取到了相同的seq,
//所以insert的时候冲突发生了。
$result = $this->_db->getRow(
“SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
$this->_db->execute(
"INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();
3. 贴上报错信息
失败方法1:
不合理的原因是:直接加上异常处理机制。当冲突时,并发的请求因为抛出异常,直接被捕获,程序继续运行,不做任何处理。结果就是:导致客户端请求失败,客户端需要重新发起请求。
try{
//代码放这里
} catch(Exception $e){
if ($e->getCode() == 1062){
//
}
}
失败方法2
尝试重试机制.当请求第一次失败时,会多次尝试请求。当数据库唯一索引冲突的时候,catch捕获该异常,然后重试。重试的条件必须是:捕获的异常是数据库插入冲突,返回1062异常code。
for($i=0; $i code() == 1062){
continue;
}
throw $e;
}
失败方法3
通过redis来控制。因为冲突的根源就是于并发导致多个请求读取到了相同的seq导致的。所以考虑使用redis‘锁’的机制来实现。第一个获取到seq的请求会设置一个key,之后的请求都会在这个key的基础上进行获取。
$device = "";
$result = $this->_db->getRow(
“SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
//因为并发其实就是因为上面的sql语句查到了相同的seq,所以这里
//就只获取第一条执行的seq,之后的全部通过对该redis加1操作。使用setnx和incr是为了保证原子操作。
//这个处理方式仍然存在问题,比如在键值过期的时刻存在并发,这个时候$Seq就可能从1开始。
$lock = $redis->setnx($device, $Seq)
$redis->expire($device, 2);
if (!$lock){
$Seq = $redis->incr($device);
}
$this->_db->execute(
"INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();
失败方式4
另一种重试机制。上面的重试机制是靠MySQL数据库的插入冲突,现在的重试是通过Redis在Select语句层面实现。
$device = "";
for($i=0;$i_db->getRow(
“SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
$lock = $redis->setnx($device . $Seq, rand());
if ($lock){
$success = true;
$redis->expire($device . $Seq, 2);
break;
}
}
if (!$success) {
throw new Exception();
}
$this->_db->execute(
"INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();
}
失败方法5
使用事务。使用MySQL机制的事务来实现。首先在Select语句中加上意向排他锁IX, 修改之后的SQL为“SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1 FOR UPDATE”
, 后面的语句不变。官方文档是这样介绍的:两个意向排他说并没有冲突,所以并发请求A和并发请求B同时会获取IX,insert操作需要一个X锁。
X | IX | S | IS | |
---|---|---|---|---|
X | Conflict | Conflict | Conflict | Conflict |
IX | Conflict | Compatible | Conflict | Compatible |
S | Conflict | Conflict | Compatible | Compatible |
IS | Conflict | Compatible | Compatible | Compatible |
我的猜测:A请求在获取X锁的时候,B请求也在获取X锁。导致A请求在他遍历的行上加锁,B请求也同时在加锁,造成了循环等待锁释放的情况,产生死锁。
//前一句使用select for update
//然后和insert包裹起来。多进程执行的话,会出现死锁。
//通过降低mysql的隔离水平确实可以实现,但是可能会产生不可重复读的问题.通俗的介绍就是:A请求在操作事务中两次读到的数据可能不一致。
失败方法6
将两条sql放在一条sql里。INSERT INTO table_2(device, seq) VALUES(?, (select seq + 1 from table_2) where device = ? )", array($device)
//因为前一条sql是我简化版,其实他是一个分表的查询操作。
//这里的分表是一个router操作,所以不适合进行一条sql语句
//就是会依次查询我们这里分的几个表,最近的一个表查不到,查第二个
方法7
修改数据表结构,将seq保存到独立的一个其他表中,每个device对应一个最大的seq。这样在select的时候就可以保证select for update 不会产生间隙锁,而是指在一条数据行上加锁。
贴上相关截图
图片就如上所示。大部分的代码都是自己手工敲的,所以有些地方大家不用深究,了解清楚意思就可以。
希望大家可以有建设性建议
大神来指点指点吧
我实在是想不出什么好的办法了。将并发请求转化为串行,这个想法没有很好的实现。大神在哪里?
insert into table_2 select max(seq)+1,device from table_2 WHERE device = ?;
直接使用redis的incr来生成自增id,incr是原子操作,不会有并发问题。
你的问题归纳一下就是:如何实现同一个device下seq自增长问题。
解决方法:使用redis的hash保存device与seq的关系,当需要为指定device生产一个唯一seq时使用HINCRBY命令即可。