PHP利用闭包实现MySQL事务场景下缓存一致性的模型
程序员文章站
2024-03-19 22:10:58
...
前言
为了提高接口响应,在自己开发的PHP框架中中实现了带Redis缓存的Insert、Update、Delete、Query操作的基础Model。而然,这种带缓存的操作是不支持事务下的缓存一致性的(数据回滚了,缓存已生成),并且实践编程中大量用到事务操作,都是在catch里手动清理缓存的。为了可以让开发人员不必关注框架底层,随重新了BaseModel。
问题分析
为什么缓存在事务下会出现不一致呢?原因是事务代码段中出现bug导致数据rollback, 数据回滚了,可是Model中的Insert、Update、Delete操作已经执行,这时Redis缓存中的数据和数据库就不一致了。所以:
- 如果是在事务中,需要延迟缓存操作的执行;
- 如果不在事务中,立即执行缓存操作;
知识要点
- Mysql是不支持嵌套事务的,开启了一个事务的情况下,再开启一个事务,会隐式的提交上一个事务。
- PHP 闭包
项目结构
- DB类,负责与MySQL通讯,管理闭包队列;
- ClosureQueue闭包队列, 注册、执行闭包;
- BaseModel 是所以 ORM类的基类,直接操作DB以及闭包的定义;
代码结构
Factory类
class Factory
{
private static $db = null;
public function getDB(){
if(self::$db == null){
self::$db = new DB;
}
return self::$db;
}
}
ClosureQueue
<?php
/**
*闭包队列
**/
class ClosureQueue
{
private $closureQueue = null; //闭包列表
private $closureArgsQueue = null; //闭包的参数
public function Register(callable $fn, array $args){//闭包的注册
$this->closureQueue[] = $fn;
$this->closureArgsQueue[] = $args;
}
public function Execute(){//闭包的弹出和执行
$fn = array_shift($this->closureQueue);
$args = array_shift($this->closureArgsQueue);
call_user_func_array($fn, $args);
}
public function Reset() {//重置闭包
$this->closureQueue = [];
}
public function Len() {//闭包的数量
return count($this->closureQueue);
}
}
DB类
class DB {
public static $closureQueue = null; //闭包队列
public $pdo = null;
private $_transactions = 0; //事务层数
public $talbeName = '';
public function table($tableName) {
$this->tableName = $tableName;
return $this;
}
public function where($data){
return $this;
}
public function BeginX() {//开启事务
++$this->_transactions;
if($this->_transactions == 1){
$this->query('START TRANSACTION');
self::$closureQueue = new ClosureQueue;
}
}
public function Commit() {//提交事务
if($this->_transactions == 1){
$this->query('COMMIT');
//执行闭包
if(($len = self::$closureQueue->Len()) > 0){
for ($i=0; $i < $len; $i++) {
self::$closureQueue->Execute();
}
}
}
--$this->_transactions;
}
public function Rollback() {//回滚事务
//重置
if($this->_transactions == 1){
$this->_transactions = 0;
$this->query('ROLLBACK');
//移除此次事务的闭包列表
self::$closureQueue = null;
}else{
--$this->_transactions;
}
}
public function Query($sql){
//省略sql执行细节
return true;
}
public function Insert($data, $closure = null){
//此处省略sql拼装
$flag = $this->Query($data);
if($flag){
// $insertId = $this->pdo->lastInsertId(); //实际执行此处代码
$insertId = time();
if($closure && is_callable($closure)){
$args = ['insertId'=>$insertId, 'flag'=>$flag];
if(self::$closureQueue instanceof ClosureQueue && $closure){//处于事务中,并且设置了回调函数
self::$closureQueue->Register($closure, $args);
}else{//非事务,立即执行
call_user_func_array($closure, $args);
}
}
}else{
return false;
}
}
public function Update($data, $closure = null){
//此处省略sql拼装
$sql = "Update";
$flag = $this->Query($data);
if($flag){
if($closure && is_callable($closure)){
$args = ['flag'=>$flag];
if(self::$closureQueue instanceof ClosureQueue && $closure){//处于事务中,并且设置了回调函数
self::$closureQueue->Register($closure, $args);
}else{//非事务,立即执行
call_user_func_array($closure, $args);
}
}
}else{
return false;
}
}
public function Delete($closure = null){
//此处省略sql拼装
$sql = "DELETE";
$flag = $this->Query(null);
if($flag){
if($closure && is_callable($closure)){
$args = ['flag'=>$flag];
if(self::$closureQueue instanceof ClosureQueue && $closure){//处于事务中,并且设置了回调函数
self::$closureQueue->Register($closure, $args);
}else{//非事务,立即执行
call_user_func_array($closure, $args);
}
}
}else{
return false;
}
}
}
BaseModel
class BaseModel
{
public $id = null;
public static function TableName() { //子类继承,标识表面
return '';
}
public static function PrimaryKey() {
return '';
}
public function getAttributes(){
//省略通过反射获取属性
return [];
}
public function Update() {
$closure = function() {
echo "Update Cache For Update#".$this->id."\n";
};
$table_name = static::TableName();
$attributes = $this->getAttributes();
$flag = Factory::getDB()->table($table_name)->update($attributes, $closure);
return $flag;
}
public function Delete() {
$closure = function() {
echo "Delete Cache For Delete #".$this->id."\n";
};
$primary = ['user_id'=>1]; //省略反射获取主键key/value
$table_name = static::TableName();
Factory::getDB()->table($table_name)->where($primary)->Delete($closure);
}
public function Create() {
$closure = function() {
echo "Create Cache For Create #".$this->id."\n";
};
$table_name = static::TableName();
$attributes = $this->getAttributes();
Factory::getDB()->table($table_name)->Insert($attributes, $closure);
}
public function FindByPk($id) {
//优先获取缓存数据
}
}
模型结构测试脚本
$st = microtime(true) * 1e6;
$db = Factory::getDB();
$db->BeginX();
try{
$model = new BaseModel();
$model->id = 1;
$model->Create();
$model->Update();
$model->Delete();
$db->BeginX();
try{
$model = new BaseModel();
$model->id = 2;
$model->Create();
$model->Update();
$model->Delete();
throw new \ErrorException("DB Execute Failed");
$db->Commit();
}catch(\Exception $e){
$db->Rollback();
throw $e;
}
// throw new \ErrorException("DB Execute Failed");
$db->Commit();
}catch(\Exception $e){
$db->Rollback();
}
echo "SpendTime:" . (microtime(true) * 1e6 - $st);
执行结果(成功时)
Create Cache For Create #1
Update Cache For Update#1
Delete Cache For Delete #1
Create Cache For Create #2
Update Cache For Update#2
Delete Cache For Delete #2
SpendTime:166[Finished in 0.0s]
执行结果(异常时)
PHP Fatal error: Uncaught exception 'Exception' with message 'DB Execute Failed' in /Users/liugaoyun/Desktop/1.php:249
Stack trace:
#0 {main}
thrown in /Users/liugaoyun/Desktop/1.php on line 249
Fatal error: Uncaught exception 'Exception' with message 'DB Execute Failed' in /Users/liugaoyun/Desktop/1.php:249
Stack trace:
#0 {main}
thrown in /Users/liugaoyun/Desktop/1.php on line 249
[Finished in 0.0s]
闭包队列没有执行,满足预期。
上一篇: Linux网络分析
下一篇: zab与paxos算法联系与区别