欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

php redis消息队列

程序员文章站 2022-03-10 08:17:54
...

不专业的点赞消息队列
仅实现了点赞,取消点赞和点赞过程基本相同,只需要explode数据库中的star列内的作品id名
前端传来点赞作品的id,作品的分类(普通作品,动态),用户名userName
redis队列里存的格式为 userNmae=>id
脚本运行addlike_time,将队列数据取出,存入mysql,mysql用了pdo,乐观锁(写的不是很专业)。

addlike_time文件用到了mypdo类

数据库
user
star和dynamicstar为点赞作品储存,最后的version栏没有显示,用于乐观锁。
php redis消息队列
user设计表
php redis消息队列
yijingwork表
php redis消息队列

mypdo.php

<?php
# 基于PDO的二次封装

# 命名空间
// namespace core;

# 引入系统类:基于PDO的实现,需要引入第三个类
// use \PDO,\PDOStatement,\PDOException;

class MyPDO{
	private $pdo;			# 保存PDO类对象
	private $fetch_mode;	# 查询数据的模式:默认为关联数组
	public $error; 			# 记录的错误信息
	
	# 构造方法
	# 默认采用PDO异常和获取关联数组设定
	public function __construct($datebase_info = array(),$drivers = array()){
		$type = isset($database_info['type'])?$database_info['type']:'mysql';
		$host = isset($database_info['host'])?$database_info['host']:'localhost';
		$port = isset($database_info['port'])?$database_info['port']:'3306';
		$user = isset($database_info['user'])?$database_info['user']:'root';
		$pass = isset($database_info['pass'])?$database_info['pass']:'123456';
		$dbname = isset($database_info['dbname'])?$database_info['dbname']:'yijing';
		$charset = isset($database_info['charset'])?$database_info['charset']:'utf8';
		# fetchmode不能在初始化的时候实现,需要在得到PDOStatement类对象时设置
		$this->fetch_mode = isset($drivers[PDO::ATTR_DEFAULT_FETCH_MODE])?($drivers[PDO::ATTR_DEFAULT_FETCH_MODE]):(PDO::FETCH_ASSOC);
		
		#控制属性(增加异常处理模式)
		if(!isset($drivers[PDO::ATTR_ERRMODE])){
			$drivers[PDO::ATTR_ERRMODE] = PDO::ERRMODE_EXCEPTION;
		}		
		
		try{
            $this->pdo = @new PDO($type.':host='.$host.';port='.$port.';dbname='.$dbname.';charset='.$charset,$user,$pass,$drivers);
		}catch(PDOException $e){
            # 调用异常处理方法
			$this->my_exception($e);
		}
	}
	private function my_exception(PDOException $e){
		$this->error['file'] = $e->getFile();
		$this->error['line'] = $e->getLine();
		$this->error['error'] = $e->getMessage();
		# 返回false,让外部处理
		return false;
	}
	
	# 写操作
	public function my_exec($sql){
		try{
			return $this->pdo->exec($sql);
		}catch(PDOException $e){
			return $this->my_exception($e);
		}
	}
	
	# 获取自增长ID
	public function my_last_insert_id(){
		try{
			$id = $this->pdo->lastInsertId();
			
			# 主动抛出异常
			if(!$id) throw new PDOException('自增长ID不存在!');
			return $id;
		}catch(PDOException $e){
			return $this->my_exception($e);
		}
	}
	
	# 读方法:按条件进行单行或多行数据返回
	public function my_query($sql,$only = true){
		try{
			$stmt = $this->pdo->query($sql);
			# 设置查询模式
			$stmt->setFetchMode($this->fetch_mode);
		}catch(PDOException $e){
			return $this->my_exception($e);
		}
		
		# 数据解析
		if($only){
			return $stmt->fetch();
		}else{
			return $stmt->fetchAll();
		}
		
    }
	public function prepare_print($pre_sql,$val = array()){
		try{
			$stmt = $this->pdo->prepare($pre_sql);
			if(!$stmt) die('预处理指令执行失败!');
			$res = $stmt->execute($val);
			while($row=$stmt->fetch(PDO::FETCH_ASSOC)){
				//生成器
				yield $row;
			}
		}catch(PDOException $e){
			//无法return
		}
	}
	public function prepare_noprint($pre_sql,$val = array()){
		try{
			$stmt = $this->pdo->prepare($pre_sql);
			if(!$stmt) die('预处理指令执行失败!');
			$res = $stmt->execute($val);
			if($res){
				return true;
			}else{
				return false;
			}
		}catch(PDOException $e){
			// echo $e;
			return $this->my_exception($e);
		}
	}

	public function prepare_optimism_lock($sql,$pre_sql,$val1 = array(),$val2 = array(),$version){
		try{
			//语句1:取出版本号
			$stmt1 = $this->pdo->prepare($sql);
			if(!$stmt1) die('预处理指令1执行失败!');
			$res1 = $stmt1->execute($val1);
			if($res1 > 0){
				$row1=$stmt1->fetch(PDO::FETCH_ASSOC);
				$val2[$version]=$row1[$version];
				//语句2:更新语句,无输出
				$stmt2 = $this->pdo->prepare($pre_sql);
				if(!$stmt2) die('预处理指令2执行失败!');
				$res2 = $stmt2->execute($val2);
				if($res2 == 0){
					prepare_optimism_lock($sql,$pre_sql,$val1,$val2,$version);
				}else{
					return true;
				}
			}else{
				return false;
			}
		}catch(PDOException $e){
			return $this->my_exception($e);
		}
	}
	
}




?>

header.php
请求头

<?php
header('Access-Control-Allow-Origin:*');
header('Access-Control-Allow-Headers:x-requested-with,content-type');
header("Access-Control-Allow-Methods: POST, GET, OPTIONS, PUT, DELETE");

addlike_redis.php

include "../header.php";

$res = file_get_contents("php://input");
//获取axios传来的数据
$a = json_decode($res,true);
$Id=$a['Id'];
$LikeClass=$a['LikeClass'];
$userName=$a['userName'];

$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis->select(0);

if($LikeClass == "work"){
    $redis->lpush("yijingwork","$userName"."=>"."$Id");
    echo 1;
}else if($LikeClass == "dynamic"){
    $redis->lpush("dynamic","$userName"."=>"."$Id");
    echo 1;
}

addlike_time.php

<?php
include "../header.php";
function my_autoload($classname){
	#直接加载
	if(!class_exists($classname)){
		#内存不存在,尝试加载
        $file='../'.'db/'.$classname.'.php';
		if(file_exists($file)){
			include $file;
		}
		$file='../'.'login/token/'.$classname.'.php';
		if(file_exists($file)){
			include $file;
		}
	}
}
spl_autoload_register('my_autoload');

$m = new MyPDO;

$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis->select(0);
function addLike_prepare_optimism_lock($sql,$pre_sql,$val1 = array(),$val2 = array(),$version,$nextStar,$starOrdynamicStar){
    // try{
        //语句1:取出版本号
        $pdo = new PDO('mysql:host=localhost;port=3306;dbname=yijing;charset=utf8','root','123456');
        $stmt1 = $pdo->prepare($sql);
        if(!$stmt1) die('预处理指令1执行失败!');
        $res1 = $stmt1->execute($val1);
        if($res1 > 0){
            $row1=$stmt1->fetch(PDO::FETCH_ASSOC);
            $val2[$version]=$row1[$version];
            $val2['star'] = $row1[$starOrdynamicStar].$nextStar;
            //语句2:更新语句,无输出
            $stmt2 = $pdo->prepare($pre_sql);
            if(!$stmt2) die('预处理指令2执行失败!');
            $res2 = $stmt2->execute($val2);
            if($res2 == 0){
                addLike_prepare_optimism_lock($sql,$pre_sql,$val1,$val2,$version,$nextStar,$starOrdynamicStar);
            }else{
                return true;
            }
        }else{
            return false;
        }
    // }catch(Exception $e){
    //     echo $e;
    // }
}

function addStar($userName="",$nextStar="",$m){
    $db = "yijingWork";
    $starOrdynamicStar = "star";
    $WOrD_id ="workId";
    if($userName == "" || $nextStar == ""){
        return false;
    }else{
        if(true){
            $sql = "select likes,likeVersion from $db where $WOrD_id=:workId";
            $pre_sql =  "update $db set likes= likes+1,likeVersion= likeVersion+1 where  $WOrD_id = :workId and likeVersion = :likeVersion";
            $version = "likeVersion";
            $val1= array('workId'=>$nextStar);
            $val2= array('workId'=>$nextStar);
            $res = $m->prepare_optimism_lock($sql,$pre_sql,$val1,$val2,$version);
            // echo $res;
            if($res){
                $sqlx = "select version,$starOrdynamicStar from user where userName=:userName";
                $pre_sqlx =  "update user set $starOrdynamicStar= :star,version= version+1 where  userName = :userName and version = :version";
                $versionx = "version";
                $val1x= array('userName'=>$userName);
                $val2x= array('userName'=>$userName);
                $nextStar = "[next]".$nextStar;
                $resx = addLike_prepare_optimism_lock($sqlx,$pre_sqlx,$val1x,$val2x,$versionx,$nextStar,$starOrdynamicStar);
                if($resx){
                    return true;
                }else{
                    return false;
                }
            }else{
                echo 0;
            }
        }else{
            echo 0;
        }
    }
}

while(true){    //循环
    $value = $redis->rpop('yijingwork');
    if(!$value){    //判断退出循环
        $l = $redis->lLen('yijingwork');
        if($l <= 1){    //对列长度小于1
            sleep(20);//尽量使用break
        }
        continue;
    }else{
        //已经取出数据
        $arr = explode("=>",$value);
        $res = addStar($arr[0],$arr[1],$m);
    }
    echo $value;
    sleep(0.01);
}
相关标签: php mysql redis