异步redis队列实现 数据入库的方法
业务需求
app客户端向服务端接口发送来json 数据 每天 发一次 清空缓存后会再次发送
出问题之前业务逻辑:
php 接口 首先将 json 转为数组 去重 在一张大表中插入不存在的数据
该用户已经存在 和新增的id
入另一种详情表
问题所在:
当用户因特殊情况清除缓存 导致app 发送json串 入库并发高 导致cpu 暴增到88% 并且居高不下
优化思路:
1、异步队列处理
2、redis 过滤(就是只处理当天第一次请求)
3、redis 辅助存储app名称(验证过后批量插入数据app名称表中)
4、拼接插入的以及新增的如详细表中
解决办法:
1、接口修改 redis 过滤 + 如list队列 并将结果存入redis中
首先 redis将之前的历史数据放在redis 哈希里面 中文为键名 id 为键值
<?php /** * created by haiyong. * user: jia * date: 2017/9/18 * time: 20:06 */ namespace app\http\controllers\app; use app\http\controllers\controller; use illuminate\http\request; use illuminate\support\facades\db; use illuminate\support\facades\redis; class otherappcontroller extends controller{ /** * app应用统计接口 * @param request $request * @return string */ public function apptotal(request $request) { // //历史数据入库 //$redis = redis::connection('web_active'); // $app_name = db::connection('phplog')->table('app_set_name')->where("appname", '<>', ' ')->lists('id', 'appname'); // $str = ''; // foreach ($app_name as $key => $val) { // $str.= "{$val} {$key} "; // } // $redis->hmset('app_name', $app_name); // echo $str;exit; $result = $request->input('res'); $list = json_decode($result, true); if (empty ($list) || !is_array($list)) { return json_encode(['result' => 'error', 'msg' => 'parameter error']); } $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ; $data['time'] = date('y-m-d'); $redis_key = 'log_app:'.$data['time']; //redis 过滤 $redis = redis::connection('web_active'); //redis 键值过期设置 if (empty($redis->exists($redis_key))) { $redis->hset($redis_key, 1, 'start'); $redis->expireat($redis_key, strtotime($data['time'].'+2 day')); } //值确定 if ($redis->hexists($redis_key, $data['uid'])) { return json_encode(['result' => 'success']); } else { //推入队列 $redis->hset($redis_key, $data['uid'], $result); $redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']); return json_encode(['result' => 'success']); } } }
2、php 脚本循环 监控redis 队列 执行逻辑 防止内存溢出
mget 获取该用户的app id 不存在就会返回null
通过判断null 运用redis 新值作为自增id指针 将null 补齐 之后批量入mysql 并跟新redis 哈希 和指针值 并入库 详情表
<?php namespace app\console\commands; use illuminate\console\command; use illuminate\support\facades\redis; use illuminate\support\facades\db; use illuminate\support\facades\storage; class apptotal extends command { /** * the name and signature of the console command. * * @var string */ protected $signature = 'apptotal:run'; /** * the console command description. * * @var string */ protected $description = 'command description'; /** * create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * execute the console command. * * @return mixed */ public function handle() { //历史数据入库 // $redis = redis::connection('web_active'); // $app_name = db::connection('phplog')->table('app_set_name')->where("appname", '<>', ' ')->lists('id', 'appname'); // $redis->hmset('app_name', $app_name); // exit; while(1) { $redis = redis::connection('web_active'); //队列名称 $res = $redis->lpop('log_app_list'); //开关按钮 $lock = $redis->get('log_app_lock'); if (!empty($res)) { list($date,$uid) = explode(':',$res); $result = $redis->hget('log_app:'.$date, $uid); if (!empty($result)) { $table_name = 'app_total'.date('ym'); $list = json_decode($result, true); $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ; $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ; $data['device'] = isset($list['device']) ? $list['device'] : '' ; $data['applist'] = isset($list['list']) ? $list['list'] : '' ; //数据去重 flip比unique更节约性能 $data['applist'] = array_flip($data['applist']); $data['applist'] = array_flip($data['applist']); $data['time'] = date('y-m-d'); //app应用过滤 $app_res = $redis->hmget('app_name', $data['applist']); //新增加app数组 $new_app = []; //mysql 入库数组 $mysql_new_app = []; //获取当前redis 自增指针 $total = $redis->get('app_name_total'); foreach ($app_res as $key =>& $val) { if (is_null($val)) { $total += 1; $new_app[$data['applist'][$key]] = $total; $val = $total; array_push($mysql_new_app,['id' => $total, 'appname'=> $data['applist'][$key]]); } } if (count($new_app)){ $str = "insert ignore into app_set_name (id,appname) values"; foreach ($new_app as $key => $val) { $str.= "(".$val.",'".$key."'),"; } $str = trim($str, ','); //$mysql_res = db::connection('phplog')->table('app_set_name')->insert($mysql_new_app); $mysql_res = db::connection('phplog')->statement($str); if ($mysql_res) { // 设置redis 指针 $redis->set('app_name_total', $total); // redis 数据入库 $redis->hmset('app_name', $new_app); } } // 详情数据入库 $data['applist'] = implode(',', $app_res); //app统计入库 db::connection('phplog')->statement("insert ignore into ".$table_name." (uid,sex,device,`time`,applist) values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['applist']."')"); //log 记录 当文件达到123mb的时候产生内存保错 所有这个地方可是利用日志切割 或者 不写入 日志 storage::disk('local')->append(directory_separator.'total'.directory_separator.'loaapptotal.txt', date('y-m-d h:i:s').' success '.$result."\n"); } else { storage::disk('local')->append(directory_separator.'total'.directory_separator.'loaapptotal.txt', date('y-m-d h:i:s').' error '.$result."\n"); } } //执行间隔 sleep(1); //结束按钮 if ($lock == 2) { exit; } //内存检测 if(memory_get_usage()>1000*1024*1024){ exit('内存溢出');//大于100m内存退出程序,防止内存泄漏被系统杀死导致任务终端 } } } }
3、执定 定时任务监控脚本执行情况
crontab -e /2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1
test.sh 内容 (查看执行命令返回的进程id 如果没有就执行命令开启)
#!/bin/bash alive=`ps -ef | grep apptotal | grep -v grep | awk '{print $2}'` if [ ! $alive ] then /usr/local/php/bin/php /var/ms/artisan apptotal:run > /dev/null & fi
记得授权哦 chmod +x test.sh
笔者用的laravel 框架 将命令激活丢入后台
执行命令
/usr/local/php/bin/php /var/ms/artisan apptotal:run > /dev/null &
完事直接 ctrl -c 结束就行 命令以在后台运行 可以用shell 中的命令查看进程id
这样就实现队列异步入库
还有很多问题需要优化!!大致功能已经实现!!!!!!
优化完成后cpu
以上这篇异步redis队列实现 数据入库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
推荐阅读
-
AJAX Servlet实现数据异步交互的方法
-
Python的Flask框架应用调用Redis队列数据的方法
-
AJAX Servlet实现数据异步交互的方法
-
vue+echarts实现动态绘制图表及异步加载数据的方法
-
Python的Flask框架应用调用Redis队列数据的方法
-
C#实现异步连接Sql Server数据库的方法
-
Android开发实现ListView异步加载数据的方法详解
-
bootstrap jquery dataTable 异步ajax刷新表格数据的实现方法
-
Python cookbook(数据结构与算法)实现优先级队列的方法示例
-
Ajax通过XML异步提交的方法实现从数据库获取省份和城市信息实现二级联动(xml方法)