php redis消息队列
程序员文章站
2022-03-10 08:17:54
...
不专业的点赞消息队列
仅实现了点赞,取消点赞和点赞过程基本相同,只需要explode数据库中的star列内的作品id名
前端传来点赞作品的id,作品的分类(普通作品,动态),用户名userName
redis队列里存的格式为 userNmae=>id
脚本运行addlike_time,将队列数据取出,存入mysql,mysql用了pdo,乐观锁(写的不是很专业)。
addlike_time文件用到了mypdo类
数据库
user
star和dynamicstar为点赞作品储存,最后的version栏没有显示,用于乐观锁。
user设计表
yijingwork表
mypdo.php
<?php
# 基于PDO的二次封装
# 命名空间
// namespace core;
# 引入系统类:基于PDO的实现,需要引入第三个类
// use \PDO,\PDOStatement,\PDOException;
class MyPDO{
private $pdo; # 保存PDO类对象
private $fetch_mode; # 查询数据的模式:默认为关联数组
public $error; # 记录的错误信息
# 构造方法
# 默认采用PDO异常和获取关联数组设定
public function __construct($datebase_info = array(),$drivers = array()){
$type = isset($database_info['type'])?$database_info['type']:'mysql';
$host = isset($database_info['host'])?$database_info['host']:'localhost';
$port = isset($database_info['port'])?$database_info['port']:'3306';
$user = isset($database_info['user'])?$database_info['user']:'root';
$pass = isset($database_info['pass'])?$database_info['pass']:'123456';
$dbname = isset($database_info['dbname'])?$database_info['dbname']:'yijing';
$charset = isset($database_info['charset'])?$database_info['charset']:'utf8';
# fetchmode不能在初始化的时候实现,需要在得到PDOStatement类对象时设置
$this->fetch_mode = isset($drivers[PDO::ATTR_DEFAULT_FETCH_MODE])?($drivers[PDO::ATTR_DEFAULT_FETCH_MODE]):(PDO::FETCH_ASSOC);
#控制属性(增加异常处理模式)
if(!isset($drivers[PDO::ATTR_ERRMODE])){
$drivers[PDO::ATTR_ERRMODE] = PDO::ERRMODE_EXCEPTION;
}
try{
$this->pdo = @new PDO($type.':host='.$host.';port='.$port.';dbname='.$dbname.';charset='.$charset,$user,$pass,$drivers);
}catch(PDOException $e){
# 调用异常处理方法
$this->my_exception($e);
}
}
private function my_exception(PDOException $e){
$this->error['file'] = $e->getFile();
$this->error['line'] = $e->getLine();
$this->error['error'] = $e->getMessage();
# 返回false,让外部处理
return false;
}
# 写操作
public function my_exec($sql){
try{
return $this->pdo->exec($sql);
}catch(PDOException $e){
return $this->my_exception($e);
}
}
# 获取自增长ID
public function my_last_insert_id(){
try{
$id = $this->pdo->lastInsertId();
# 主动抛出异常
if(!$id) throw new PDOException('自增长ID不存在!');
return $id;
}catch(PDOException $e){
return $this->my_exception($e);
}
}
# 读方法:按条件进行单行或多行数据返回
public function my_query($sql,$only = true){
try{
$stmt = $this->pdo->query($sql);
# 设置查询模式
$stmt->setFetchMode($this->fetch_mode);
}catch(PDOException $e){
return $this->my_exception($e);
}
# 数据解析
if($only){
return $stmt->fetch();
}else{
return $stmt->fetchAll();
}
}
public function prepare_print($pre_sql,$val = array()){
try{
$stmt = $this->pdo->prepare($pre_sql);
if(!$stmt) die('预处理指令执行失败!');
$res = $stmt->execute($val);
while($row=$stmt->fetch(PDO::FETCH_ASSOC)){
//生成器
yield $row;
}
}catch(PDOException $e){
//无法return
}
}
public function prepare_noprint($pre_sql,$val = array()){
try{
$stmt = $this->pdo->prepare($pre_sql);
if(!$stmt) die('预处理指令执行失败!');
$res = $stmt->execute($val);
if($res){
return true;
}else{
return false;
}
}catch(PDOException $e){
// echo $e;
return $this->my_exception($e);
}
}
public function prepare_optimism_lock($sql,$pre_sql,$val1 = array(),$val2 = array(),$version){
try{
//语句1:取出版本号
$stmt1 = $this->pdo->prepare($sql);
if(!$stmt1) die('预处理指令1执行失败!');
$res1 = $stmt1->execute($val1);
if($res1 > 0){
$row1=$stmt1->fetch(PDO::FETCH_ASSOC);
$val2[$version]=$row1[$version];
//语句2:更新语句,无输出
$stmt2 = $this->pdo->prepare($pre_sql);
if(!$stmt2) die('预处理指令2执行失败!');
$res2 = $stmt2->execute($val2);
if($res2 == 0){
prepare_optimism_lock($sql,$pre_sql,$val1,$val2,$version);
}else{
return true;
}
}else{
return false;
}
}catch(PDOException $e){
return $this->my_exception($e);
}
}
}
?>
header.php
请求头
<?php
header('Access-Control-Allow-Origin:*');
header('Access-Control-Allow-Headers:x-requested-with,content-type');
header("Access-Control-Allow-Methods: POST, GET, OPTIONS, PUT, DELETE");
addlike_redis.php
include "../header.php";
$res = file_get_contents("php://input");
//获取axios传来的数据
$a = json_decode($res,true);
$Id=$a['Id'];
$LikeClass=$a['LikeClass'];
$userName=$a['userName'];
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis->select(0);
if($LikeClass == "work"){
$redis->lpush("yijingwork","$userName"."=>"."$Id");
echo 1;
}else if($LikeClass == "dynamic"){
$redis->lpush("dynamic","$userName"."=>"."$Id");
echo 1;
}
addlike_time.php
<?php
include "../header.php";
function my_autoload($classname){
#直接加载
if(!class_exists($classname)){
#内存不存在,尝试加载
$file='../'.'db/'.$classname.'.php';
if(file_exists($file)){
include $file;
}
$file='../'.'login/token/'.$classname.'.php';
if(file_exists($file)){
include $file;
}
}
}
spl_autoload_register('my_autoload');
$m = new MyPDO;
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
$redis->select(0);
function addLike_prepare_optimism_lock($sql,$pre_sql,$val1 = array(),$val2 = array(),$version,$nextStar,$starOrdynamicStar){
// try{
//语句1:取出版本号
$pdo = new PDO('mysql:host=localhost;port=3306;dbname=yijing;charset=utf8','root','123456');
$stmt1 = $pdo->prepare($sql);
if(!$stmt1) die('预处理指令1执行失败!');
$res1 = $stmt1->execute($val1);
if($res1 > 0){
$row1=$stmt1->fetch(PDO::FETCH_ASSOC);
$val2[$version]=$row1[$version];
$val2['star'] = $row1[$starOrdynamicStar].$nextStar;
//语句2:更新语句,无输出
$stmt2 = $pdo->prepare($pre_sql);
if(!$stmt2) die('预处理指令2执行失败!');
$res2 = $stmt2->execute($val2);
if($res2 == 0){
addLike_prepare_optimism_lock($sql,$pre_sql,$val1,$val2,$version,$nextStar,$starOrdynamicStar);
}else{
return true;
}
}else{
return false;
}
// }catch(Exception $e){
// echo $e;
// }
}
function addStar($userName="",$nextStar="",$m){
$db = "yijingWork";
$starOrdynamicStar = "star";
$WOrD_id ="workId";
if($userName == "" || $nextStar == ""){
return false;
}else{
if(true){
$sql = "select likes,likeVersion from $db where $WOrD_id=:workId";
$pre_sql = "update $db set likes= likes+1,likeVersion= likeVersion+1 where $WOrD_id = :workId and likeVersion = :likeVersion";
$version = "likeVersion";
$val1= array('workId'=>$nextStar);
$val2= array('workId'=>$nextStar);
$res = $m->prepare_optimism_lock($sql,$pre_sql,$val1,$val2,$version);
// echo $res;
if($res){
$sqlx = "select version,$starOrdynamicStar from user where userName=:userName";
$pre_sqlx = "update user set $starOrdynamicStar= :star,version= version+1 where userName = :userName and version = :version";
$versionx = "version";
$val1x= array('userName'=>$userName);
$val2x= array('userName'=>$userName);
$nextStar = "[next]".$nextStar;
$resx = addLike_prepare_optimism_lock($sqlx,$pre_sqlx,$val1x,$val2x,$versionx,$nextStar,$starOrdynamicStar);
if($resx){
return true;
}else{
return false;
}
}else{
echo 0;
}
}else{
echo 0;
}
}
}
while(true){ //循环
$value = $redis->rpop('yijingwork');
if(!$value){ //判断退出循环
$l = $redis->lLen('yijingwork');
if($l <= 1){ //对列长度小于1
sleep(20);//尽量使用break
}
continue;
}else{
//已经取出数据
$arr = explode("=>",$value);
$res = addStar($arr[0],$arr[1],$m);
}
echo $value;
sleep(0.01);
}
推荐阅读
-
mysql - php队列计划任务怎么做呢,用的是ignore_user_abort吗?
-
配置php的redis扩展
-
redis/分布式文件存储系统/数据库 存储session,解决负载均衡集群中session不一致问题,redissession_PHP教程
-
PHP版微信公共平台消息主动推送,突破订阅号一天只能发送一条信息限制
-
redis在windows下安装和PHP中使用,redisphp_PHP教程
-
消息称编程语言PHP即将推出移动版
-
微信接口消息接口指南_PHP教程
-
知名网站分享:PHP代替Perl,Redis置换MySQL,日处理过亿PV
-
使用 PHP 消息队列实现 Android 与 Web 通信
-
PHP微信开发之模板消息回复_php实例