欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

PHP队列场景以及实现代码实例详解

程序员文章站 2022-04-26 14:01:29
为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上),但是一个业务场景可能会同时处理两个表的操作。在这种场景下,事务的提交会变得相对复杂,因为多个节点(库...

为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上),但是一个业务场景可能会同时处理两个表的操作。在这种场景下,事务的提交会变得相对复杂,因为多个节点(库)的存在,可能存在部分节点提交失败的情况,即事务的acid特性需要在各个不同的数据库实例中保证。比如更新db1库的a表时,必须同步更新db2库的b表,两个更新形成一个事务,要么都成功,要么都失败。

那么我们如何利用mysql实现分布式数据库的事务呢?

mysql是从5.0开始支持分布式事务

这里先声明两个概念:

资源管理器(resource manager):用来管理系统资源,是通向事务资源的途径。数据库就是一种资源管理器。资源管理还应该具有管理事务提交或回滚的能力。
事务管理器(transaction manager):事务管理器是分布式事务的核心管理者。事务管理器与每个资源管理器(resource
manager)进行通信,协调并完成事务的处理。事务的各个分支由唯一命名进行标识。
mysql在执行分布式事务(外部xa)的时候,mysql服务器相当于xa事务资源管理器,与mysql链接的客户端相当于事务管理器。

分布式事务原理:分段式提交
分布式事务通常采用2pc协议,全称two phase commitment protocol。该协议主要为了解决在分布式数据库场景下,所有节点间数据一致性的问题。分布式事务通过2pc协议将提交分成两个阶段:

prepare;commit/rollback

阶段一为准备(prepare)阶段。即所有的参与者准备执行事务并锁住需要的资源。参与者ready时,向transaction manager报告已准备就绪。
阶段二为提交阶段(commit)。当transaction manager确认所有参与者都ready后,向所有参与者发送commit命令。

事务协调者transaction manager
因为xa 事务是基于两阶段提交协议的,所以需要有一个事务协调者(transaction manager)来保证所有的事务参与者都完成了准备工作(第一阶段)。如果事务协调者(transaction manager)收到所有参与者都准备好的消息,就会通知所有的事务都可以提交了(第二阶段)。mysql 在这个xa事务中扮演的是参与者的角色,而不是事务协调者(transaction manager)。

mysql的xa事务分为外部xa和内部xa

外部xa用于跨多mysql实例的分布式事务,需要应用层作为协调者,通俗的说就是比如我们在php中写代码,那么php书写的逻辑就是协调者。应用层负责决定提交还是回滚,崩溃时的悬挂事务。mysql数据库外部xa可以用在分布式数据库代理层,实现对mysql数据库的分布式事务支持,例如开源的代理工具:网易的ddb,淘宝的tddl等等。

内部xa事务用于同一实例下跨多引擎事务,由binlog作为协调者,比如在一个存储引擎提交时,需要将提交信息写入二进制日志,这就是一个分布式内部xa事务,只不过二进制日志的参与者是mysql本身。binlog作为内部xa的协调者,在binlog中出现的内部xid,在crash recover时,由binlog负责提交。(这是因为,binlog不进行prepare,只进行commit,因此在binlog中出现的内部xid,一定能够保证其在底层各存储引擎中已经完成prepare)。

mysql xa事务的语法

1、首先要确保mysql开启xa事务支持

show variables like '%xa%'

如果innodb_support_xa的值是on就说明mysql已经开启对xa事务的支持了。 如果不是就执行:

set innodb_support_xa = on

主要有:

xa start 'any_unique_id'; // 'any_unique_id' 是用户给的,全局唯一在一台mysql中开启一个xa事务
xa end 'any_unique_id '; //标识xa事务的操作结束
xa prepare 'any_unique_id'; //告知mysql 准备提交这个xa事务
xa commit 'any_unique_id'; //告知mysql提交这个xa事务
xa rollback 'any_unique_id'; //告知mysql回滚这个xa事务
xa recover;//查看本机mysql目前有哪些xa事务处于prepare状态

xa事务恢复
如果执行分布式事务的mysql crash了,mysql 按照如下逻辑进行恢复:
a. 如果这个xa事务commit了,那么什么也不用做
b. 如果这个xa事务还没有prepare,那么直接回滚它
c. 如果这个xa事务prepare了,还没commit, 那么把它恢复到prepare的状态,由用户去决定commit或rollback
当mysql crash后重新启动之后,执行“xa recover;”查看当前处于prepare状态的xa事务,然后commit或rollback它们。

使用限制
a. xa事务和本地事务以及锁表操作是互斥的
开启了xa事务就无法使用本地事务和锁表操作

mysql> xa start 't1xa';
query ok, 0 rows affected (0.04 sec)
mysql> begin;
error 1399 (xae07): xaer_rmfail: the command cannot be executed when global transaction is in the active state
mysql> lock table t1 read;
error 1399 (xae07): xaer_rmfail: the command cannot be executed when global transaction is in the active state

开启了本地事务就无法使用xa事务

mysql> begin;
query ok, 0 rows affected (0.00 sec)
mysql> xa start 'rrrr';
error 1400 (xae09): xaer_outside: some work is done outside global transaction

b. xa start 之后必须xa end, 否则不能执行xa commitxa rollback

所以如果在执行xa事务过程中有语句出错了,你也需要先xa end一下,然后才能xarollback

注意事项

a. mysql只是提供了xa事务的接口,分布式事务中的mysql实例之间是互相独立的不感知的。 所以用户必须自己实现分布式事务的调度器
b. xa事务有一些使用上的bug, 参考http://www.mysqlops.com/2012/02/24/mysql-xa-optimize.html
主要是
“mysql数据库的主备数据库的同步,通过binlog的复制完成。而binlog是mysql数据库内部xa事务的协调者,并且mysql数据库为binlog做了优化——binlog不写prepare日志,只写commit日志。
所有的参与节点prepare完成,在进行xa commit前crash。crash recover如果选择commit此事务。由于binlog在prepare阶段未写,因此主库中看来,此分布式事务最终提交了,但是此事务的操作并未 写到binlog中,因此也就未能成功复制到备库,从而导致主备库数据不一致的情况出现。
而crash recover如果选rollback, 那么就会出现全局不一致(该分布式事务对应的节点,部分已经提交,无法回滚,而部分节点回滚。最终导致同一分布式事务,在各参与节点,最终状态不一致)”

参考的那篇blog中给出的办法是修改mysql代码,这个无法在dbscale中使用。 所以可选的替代方案是不使用
主从复制进行备份,而是直接使用xa事务实现同步写来作为备份。

php+mysql实现分布式事务案例

保证数据表是innodb的

//db_finance库下
create table `t_user_account` (
 `id` int(11) not null auto_increment comment 'id',
 `username` varchar(255) not null default '' comment '用户名',
 `money` int(11) not null default '0' comment '账户金额',
 primary key (`id`)
) engine=innodb auto_increment=2 default charset=utf8;
//db_order库下
create table `t_user_orders` (
 `id` int(11) not null auto_increment comment '主键',
 `username` varchar(255) not null default '',
 `money` int(11) not null default '0' comment '订单扣款金额',
 primary key (`id`)
) engine=innodb auto_increment=44 default charset=utf8;

php代码

$username = '公众号php开源社区';
$order_money = 100;

$addorder_success = addorder($username,$order_money);
$upaccount_success = updateaccount($username,$order_money);


if($addorder_success['state'] =="yes" && $upaccount_success['state']=="yes"){
  commitdb($addorder_success['xa']);
  commitdb1($upaccount_success['xa']);
}else{
  rollbackdb($addorder_success['xa']);
  rollbackdb1($upaccount_success['xa']);
}
die;
function addorder ($username, $order_money){

  $xa = uniqid("");

  $sql_xa = "xa start '$xa'";
  $db = yii::app()->dborder_readonly;
  $db->createcommand($sql_xa)->execute();

  $insert_sql = "insert into t_user_orders (`username`,`money`) values ($username,$order_money)";
  $id = $db->createcommand($insert_sql)->execute();

  $db->createcommand("xa end '$xa'")->execute();
  if ($id) {

    $db->createcommand("xa prepare '$xa'")->execute();
    return ['state' => 'yes', 'xa' => $xa];
  }else {
    return ['state' => 'no', 'xa' => $xa];
  }

}

function updateaccount($username, $order_money){
  $xa = uniqid("");
  $sql_xa = "xa start '$xa'";
  $db = yii::app()->db_finance;
  $db->createcommand($sql_xa)->execute();

  $sql = "update t_user_account set money=money-".$order_money." where username='$username'";


  $id = $db->createcommand($sql)->execute();

  $db->createcommand("xa end '$xa'")->execute();
  if ($id) {
    $db->createcommand("xa prepare '$xa'")->execute();
    return ['state' => 'yes', 'xa' => $xa];
  }else {
    return ['state' => 'no', 'xa' => $xa];
  }

}


//提交事务!
function commitdb($xa){

  $db = yii::app()->dborder_readonly;
  return $db->createcommand("xa commit '$xa'")->execute();
}

//回滚事务
function rollbackdb($xa){

  $db = yii::app()->dborder_readonly;
  return $db->createcommand("xa commit '$xa'")->execute();
}

//提交事务!
function commitdb1($xa){

  $db = yii::app()->db_finance;
  return $db->createcommand("xa commit '$xa'")->execute();

}
//回滚事务
function rollbackdb1($xa){
  $db = yii::app()->db_finance;
  return $db->createcommand("xa rollback '$xa'")->execute();

到此这篇关于php队列场景以及实现代码实例详解的文章就介绍到这了,更多相关php队列场景以及实现内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

相关标签: PHP 队列