欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  后端开发

如何处理下面的并发问题

程序员文章站 2022-05-15 20:03:58
...
1. 描述你的问题

工作中存在如下表:
表table_2,其中存在 unique key(device, seq)唯一索引。 其中seq字段是我们程序中维护的一个自增序列。
业务的需求就是:每次推送一条消息,会根据device获取表中最大的seq。
select seq from table_2 where device = ? order by seq desc
然后将获取的 seq+1 插入到table_2中作为当前消息的记录
insert into table_2 (device, seq) values(?, ?)

2 . 贴上相关代码。代码是我简化的结果,方便大家阅读

$device = "x-x";
$this->_db->startTrans();
//从table_2表中读取该device最大的seq,
//然后将该seq+1之后,重新将该device和seq插入的数据表中。
//现在的问题是当并发的时候,并发的select语句获取到了相同的seq,
//所以insert的时候冲突发生了。
$result = $this->_db->getRow(
    “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
$this->_db->execute(
    "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();

3. 贴上报错信息

失败方法1:
不合理的原因是:直接加上异常处理机制。当冲突时,并发的请求因为抛出异常,直接被捕获,程序继续运行,不做任何处理。结果就是:导致客户端请求失败,客户端需要重新发起请求。

try{
    //代码放这里
} catch(Exception $e){
    if ($e->getCode() == 1062){
        //
    }
}

失败方法2
尝试重试机制.当请求第一次失败时,会多次尝试请求。当数据库唯一索引冲突的时候,catch捕获该异常,然后重试。重试的条件必须是:捕获的异常是数据库插入冲突,返回1062异常code。

for($i=0; $i code() == 1062){
        continue;
    }
    throw $e;
}

失败方法3
通过redis来控制。因为冲突的根源就是于并发导致多个请求读取到了相同的seq导致的。所以考虑使用redis‘锁’的机制来实现。第一个获取到seq的请求会设置一个key,之后的请求都会在这个key的基础上进行获取。

$device = "";
$result = $this->_db->getRow(
    “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;

//因为并发其实就是因为上面的sql语句查到了相同的seq,所以这里
//就只获取第一条执行的seq,之后的全部通过对该redis加1操作。使用setnx和incr是为了保证原子操作。
//这个处理方式仍然存在问题,比如在键值过期的时刻存在并发,这个时候$Seq就可能从1开始。
$lock = $redis->setnx($device, $Seq)
$redis->expire($device, 2);
if (!$lock){
    $Seq = $redis->incr($device);
}
$this->_db->execute(
    "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();

失败方式4
另一种重试机制。上面的重试机制是靠MySQL数据库的插入冲突,现在的重试是通过Redis在Select语句层面实现。

$device = "";
for($i=0;$i_db->getRow(
            “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
        )
        $Seq = $result['seq'] + 1;
        $lock = $redis->setnx($device . $Seq, rand());
        if ($lock){
             $success = true;
             $redis->expire($device . $Seq, 2);
             break;
        }
    }
    if (!$success) {
            throw new Exception();
    }
    $this->_db->execute(
        "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
    );
    $this->_db->commit();
}

失败方法5
使用事务。使用MySQL机制的事务来实现。首先在Select语句中加上意向排他锁IX, 修改之后的SQL为 “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1 FOR UPDATE”, 后面的语句不变。

官方文档是这样介绍的:两个意向排他说并没有冲突,所以并发请求A和并发请求B同时会获取IX,insert操作需要一个X锁。

X IX S IS
X Conflict Conflict Conflict Conflict
IX Conflict Compatible Conflict Compatible
S Conflict Conflict Compatible Compatible
IS Conflict Compatible Compatible Compatible

我的猜测:A请求在获取X锁的时候,B请求也在获取X锁。导致A请求在他遍历的行上加锁,B请求也同时在加锁,造成了循环等待锁释放的情况,产生死锁。

//前一句使用select for update
//然后和insert包裹起来。多进程执行的话,会出现死锁。
//通过降低mysql的隔离水平确实可以实现,但是可能会产生不可重复读的问题.通俗的介绍就是:A请求在操作事务中两次读到的数据可能不一致。

失败方法6
将两条sql放在一条sql里。 INSERT INTO table_2(device, seq) VALUES(?, (select seq + 1 from table_2) where device = ? )", array($device)

//因为前一条sql是我简化版,其实他是一个分表的查询操作。
//这里的分表是一个router操作,所以不适合进行一条sql语句
//就是会依次查询我们这里分的几个表,最近的一个表查不到,查第二个

方法7
修改数据表结构,将seq保存到独立的一个其他表中,每个device对应一个最大的seq。这样在select的时候就可以保证select for update 不会产生间隙锁,而是指在一条数据行上加锁。

  1. 贴上相关截图

图片就如上所示。大部分的代码都是自己手工敲的,所以有些地方大家不用深究,了解清楚意思就可以。
希望大家可以有建设性建议

大神来指点指点吧

我实在是想不出什么好的办法了。将并发请求转化为串行,这个想法没有很好的实现。大神在哪里?

回复内容:

1. 描述你的问题

工作中存在如下表:
表table_2,其中存在 unique key(device, seq)唯一索引。 其中seq字段是我们程序中维护的一个自增序列。
业务的需求就是:每次推送一条消息,会根据device获取表中最大的seq。
select seq from table_2 where device = ? order by seq desc
然后将获取的 seq+1 插入到table_2中作为当前消息的记录
insert into table_2 (device, seq) values(?, ?)

2 . 贴上相关代码。代码是我简化的结果,方便大家阅读

$device = "x-x";
$this->_db->startTrans();
//从table_2表中读取该device最大的seq,
//然后将该seq+1之后,重新将该device和seq插入的数据表中。
//现在的问题是当并发的时候,并发的select语句获取到了相同的seq,
//所以insert的时候冲突发生了。
$result = $this->_db->getRow(
    “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;
$this->_db->execute(
    "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();

3. 贴上报错信息

失败方法1:
不合理的原因是:直接加上异常处理机制。当冲突时,并发的请求因为抛出异常,直接被捕获,程序继续运行,不做任何处理。结果就是:导致客户端请求失败,客户端需要重新发起请求。

try{
    //代码放这里
} catch(Exception $e){
    if ($e->getCode() == 1062){
        //
    }
}

失败方法2
尝试重试机制.当请求第一次失败时,会多次尝试请求。当数据库唯一索引冲突的时候,catch捕获该异常,然后重试。重试的条件必须是:捕获的异常是数据库插入冲突,返回1062异常code。

for($i=0; $i code() == 1062){
        continue;
    }
    throw $e;
}

失败方法3
通过redis来控制。因为冲突的根源就是于并发导致多个请求读取到了相同的seq导致的。所以考虑使用redis‘锁’的机制来实现。第一个获取到seq的请求会设置一个key,之后的请求都会在这个key的基础上进行获取。

$device = "";
$result = $this->_db->getRow(
    “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
)
$Seq = $result['seq'] + 1;

//因为并发其实就是因为上面的sql语句查到了相同的seq,所以这里
//就只获取第一条执行的seq,之后的全部通过对该redis加1操作。使用setnx和incr是为了保证原子操作。
//这个处理方式仍然存在问题,比如在键值过期的时刻存在并发,这个时候$Seq就可能从1开始。
$lock = $redis->setnx($device, $Seq)
$redis->expire($device, 2);
if (!$lock){
    $Seq = $redis->incr($device);
}
$this->_db->execute(
    "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
);
$this->_db->commit();

失败方式4
另一种重试机制。上面的重试机制是靠MySQL数据库的插入冲突,现在的重试是通过Redis在Select语句层面实现。

$device = "";
for($i=0;$i_db->getRow(
            “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1”, $device
        )
        $Seq = $result['seq'] + 1;
        $lock = $redis->setnx($device . $Seq, rand());
        if ($lock){
             $success = true;
             $redis->expire($device . $Seq, 2);
             break;
        }
    }
    if (!$success) {
            throw new Exception();
    }
    $this->_db->execute(
        "INSERT INTO table_2(device, seq) VALUES(?, ?)", array($device, $Seq)
    );
    $this->_db->commit();
}

失败方法5
使用事务。使用MySQL机制的事务来实现。首先在Select语句中加上意向排他锁IX, 修改之后的SQL为 “SELECT * FROM table_2 WHERE device = ? ORDER BY seq DESC LIMIT 1 FOR UPDATE”, 后面的语句不变。

官方文档是这样介绍的:两个意向排他说并没有冲突,所以并发请求A和并发请求B同时会获取IX,insert操作需要一个X锁。

X IX S IS
X Conflict Conflict Conflict Conflict
IX Conflict Compatible Conflict Compatible
S Conflict Conflict Compatible Compatible
IS Conflict Compatible Compatible Compatible

我的猜测:A请求在获取X锁的时候,B请求也在获取X锁。导致A请求在他遍历的行上加锁,B请求也同时在加锁,造成了循环等待锁释放的情况,产生死锁。

//前一句使用select for update
//然后和insert包裹起来。多进程执行的话,会出现死锁。
//通过降低mysql的隔离水平确实可以实现,但是可能会产生不可重复读的问题.通俗的介绍就是:A请求在操作事务中两次读到的数据可能不一致。

失败方法6
将两条sql放在一条sql里。 INSERT INTO table_2(device, seq) VALUES(?, (select seq + 1 from table_2) where device = ? )", array($device)

//因为前一条sql是我简化版,其实他是一个分表的查询操作。
//这里的分表是一个router操作,所以不适合进行一条sql语句
//就是会依次查询我们这里分的几个表,最近的一个表查不到,查第二个

方法7
修改数据表结构,将seq保存到独立的一个其他表中,每个device对应一个最大的seq。这样在select的时候就可以保证select for update 不会产生间隙锁,而是指在一条数据行上加锁。

  1. 贴上相关截图

图片就如上所示。大部分的代码都是自己手工敲的,所以有些地方大家不用深究,了解清楚意思就可以。
希望大家可以有建设性建议

大神来指点指点吧

我实在是想不出什么好的办法了。将并发请求转化为串行,这个想法没有很好的实现。大神在哪里?

insert into table_2 select max(seq)+1,device from table_2 WHERE device = ?;

直接使用redis的incr来生成自增id,incr是原子操作,不会有并发问题。

你的问题归纳一下就是:如何实现同一个device下seq自增长问题。
解决方法:使用redis的hash保存device与seq的关系,当需要为指定device生产一个唯一seq时使用HINCRBY命令即可。

相关标签: php redis mysql