Swoole Redis 连接池的实现
这篇文章仅仅只实现一个 redis 连接池,篇幅就太少了,顺便将前几篇整合一下。
demo 中大概包含这些点:
实现 mysql 连接池
实现 mysql curd 方法的定义
实现 redis 连接池
实现 redis 方法的定义
满足 http、tcp、websocket 调用
提供 demo 供测试
调整 目录结构
http 调用:
实现 读取 mysql 中数据的 demo
实现 读取 redis 中数据的 demo
640?wx_fmt=png
tcp 调用:
实现 读取 mysql 中数据的 demo
实现 读取 redis 中数据的 demo
640?wx_fmt=png
websocket 调用:
实现 每秒展示 api 调用量 demo
640?wx_fmt=gif
目录结构
├─ client
│ ├─ http
│ ├── mysql.php //测试 mysql 连接
│ ├── redis.php //测试 redis 连接
│ ├─ tcp
│ ├── mysql.php //测试 mysql 连接
│ ├── redis.php //测试 redis 连接
│ ├─ websocket
│ ├── index.html //实现 api 调用量展示
├─ controller
│ ├─ order.php //实现 mysql curd
│ ├─ product.php //实现 redis 调用
│ ├─ statistic.php //模拟 api 调用数据
├─ server
│ ├─ config
│ ├── config.php //默认配置
│ ├── mysql.php //mysql 配置
│ ├── redis.php //redis 配置
│ ├─ core
│ ├── common.php //公共方法
│ ├── core.php //核心文件
│ ├── handlerexception.php //异常处理
│ ├── callback //回调处理
│ ├── onrequest.php
│ ├── onreceive.php
│ ├── ontask.php
│ ├── ...
│ ├── mysql
│ ├── mysqldb.php
│ ├── mysqlpool.php
│ ├── redis
│ ├── redisdb.php
│ ├── redispool.php
│ ├─ log -- 需要 读/写 权限
│ ├── ...
├─ index.php //入口文件
代码
server/core/redis/redispool.php
<?php
if (!defined('server_path')) exit("no access");
class redispool
{
private static $instance;
private $pool;
private $config;
public static function getinstance($config = null)
{
if (empty(self::$instance)) {
if (empty($config)) {
throw new runtimeexception("redis config empty");
}
self::$instance = new static($config);
}
return self::$instance;
}
public function __construct($config)
{
if (empty($this->pool)) {
$this->config = $config;
$this->pool = new chan($config['master']['pool_size']);
for ($i = 0; $i < $config['master']['pool_size']; $i++) {
go(function() use ($config) {
$redis = new redisdb();
$res = $redis->connect($config);
if ($res === false) {
throw new runtimeexception("failed to connect redis server");
} else {
$this->pool->push($redis);
}
});
}
}
}
public function get()
{
if ($this->pool->length() > 0) {
$redis = $this->pool->pop($this->config['master']['pool_get_timeout']);
if (false === $redis) {
throw new runtimeexception("pop redis timeout");
}
defer(function () use ($redis) { //释放
$this->pool->push($redis);
});
return $redis;
} else {
throw new runtimeexception("pool length <= 0");
}
}
}
server/core/redis/redisdb.php
<?php
if (!defined('server_path')) exit("no access");
class redisdb
{
private $master;
private $slave;
private $config;
public function __call($name, $arguments)
{
// todo 主库的操作
$command_master = ['set', 'hset', 'sadd'];
if (!in_array($name, $command_master)) {
$db = $this->_get_usable_db('slave');
} else {
$db = $this->_get_usable_db('master');
}
$result = call_user_func_array([$db, $name], $arguments);
return $result;
}
public function connect($config)
{
//主库
$master = new swoole\coroutine\redis();
$res = $master->connect($config['master']['host'], $config['master']['port']);
if ($res === false) {
throw new runtimeexception($master->errcode, $master->errmsg);
} else {
$this->master = $master;
}
//从库
$slave = new swoole\coroutine\redis();
$res = $slave->connect($config['slave']['host'], $config['slave']['port']);
if ($res === false) {
throw new runtimeexception($slave->errcode, $slave->errmsg);
} else {
$this->slave = $slave;
}
$this->config = $config;
return $res;
}
private function _get_usable_db($type)
{
if ($type == 'master') {
if (!$this->master->connected) {
$master = new swoole\coroutine\redis();
$res = $master->connect($this->config['master']['host'], $this->config['master']['port']);
if ($res === false) {
throw new runtimeexception($master->errcode, $master->errmsg);
} else {
$this->master = $master;
}
}
return $this->master;
} elseif ($type == 'slave') {
if (!$this->slave->connected) {
$slave = new swoole\coroutine\redis();
$res = $slave->connect($this->config['slave']['host'], $this->config['slave']['port']);
if ($res === false) {
throw new runtimeexception($slave->errcode, $slave->errmsg);
} else {
$this->slave = $slave;
}
}
return $this->slave;
}
}
}
client/http/redis.php
<?php
$demo = [
'type' => 'sw',
'token' => 'bb1r3ylipbktp5p0',
'param' => [
'class' => 'product',
'method' => 'set',
'param' => [
'key' => 'c4649',
'value' => '订单-c4649'
],
],
];
$ch = curl_init();
$options = [
curlopt_url => 'http://10.211.55.4:9509/',
curlopt_post => 1,
curlopt_postfields => json_encode($demo),
];
curl_setopt_array($ch, $options);
curl_exec($ch);
curl_close($ch);
client/tpc/redis.php
<?php
class client
{
private $client;
public function __construct() {
$this->client = new swoole_client(swoole_sock_tcp, swoole_sock_async);
$this->client->on('connect', [$this, 'onconnect']);
$this->client->on('receive', [$this, 'onreceive']);
$this->client->on('close', [$this, 'onclose']);
$this->client->on('error', [$this, 'onerror']);
}
public function connect() {
if(!$fp = $this->client->connect("0.0.0.0", 9510, 1)) {
echo "error: {$fp->errmsg}[{$fp->errcode}]".php_eol;
return;
}
}
public function onconnect() {
fwrite(stdout, "测试rpc (y or n):");
swoole_event_add(stdin, function() {
$msg = trim(fgets(stdin));
if ($msg == 'y') {
$this->send();
}
fwrite(stdout, "测试rpc (y or n):");
});
}
public function onreceive($cli, $data) {
echo '[received]:'.$data;
}
public function send() {
$demo = [
'type' => 'sw',
'token' => 'bb1r3ylipbktp5p0',
'param' => [
'class' => 'product',
'method' => 'get',
'param' => [
'code' => 'c4649'
],
],
];
$this->client->send(json_encode($demo));
}
public function onclose() {
echo "client close connection".php_eol;
}
public function onerror() {
}
}
$client = new client();
$client->connect();
client/websocket/index.html
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="x-ua-compatible" content="ie=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="description" content="">
<meta name="keywords" content="">
<title>demo</title>
<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.js"></script>
<script src="http://echarts.baidu.com/gallery/vendors/echarts/echarts.min.js"></script>
</head>
<body>
<!-- 为echarts准备一个具备大小(宽高)的dom -->
<div id="main" style="width: 900px;height:400px;"></div>
<script type="text/javascript">
if ("websocket" in window) {
// 基于准备好的dom,初始化echarts实例
var mychart = echarts.init(document.getelementbyid('main'));
var wsserver = 'ws://10.211.55.4:9509';
var ws = new websocket(wsserver);
ws.onopen = function (evt) {
if (ws.readystate == 1) {
console.log('websocket 连接成功...');
} else {
console.log('websocket 连接失败...');
}
if (ws.readystate == 1) {
ws.send('开始请求...');
} else {
alert('websocket 连接失败');
}
};
ws.onmessage = function (evt) {
console.log('retrieved data from server: ' + evt.data);
var evt_data = jquery.parsejson(evt.data);
mychart.setoption({
xaxis: {
data : evt_data.time
},
series: [{
data: evt_data.value
}]
});
};
ws.onerror = function (evt) {
alert('websocket 发生错误');
console.log(evt);
};
ws.onclose = function() {
alert('websocket 连接关闭');
console.log('websocket 连接关闭...');
};
// 指定图表的配置项和数据
$.ajax({
url : 'http://10.211.55.4:9509/', // 请求url
type : "post", // 提交方式
datatype : "json", // 数据类型
data : {
'type' : 'sw',
'token' : 'bb1r3ylipbktp5p0',
'param' : {
'class' : 'statistic',
'method' : 'init'
}
},
beforesend:function() {
},
success : function(rs) {
if (rs.code != 1) {
alert('获取数据失败');
} else {
var option = {
title: {
text: 'api 调用量',
x:'center'
},
tooltip: {
trigger: 'axis',
axispointer: {
animation: false
}
},
xaxis: {
type : 'category',
data : rs.data.time
},
yaxis: {
type: 'value',
boundarygap: [0, '100%'],
name: '使用量',
splitline: {
show: false
}
},
series: [{
name: '使用量',
type: 'line',
showsymbol: false,
hoveranimation: false,
data: rs.data.value
}]
};
// 使用刚指定的配置项和数据显示图表。
if (option && typeof option === "object") {
mychart.setoption(option, true);
}
}
},
error : function(){
alert('服务器请求异常');
}
});
} else {
alert("您的浏览器不支持 websocket!");
}
</script>
</body>
</html>
还涉及到,onmessage.php、ontask.php 、onworkerstart.php 等,就不贴代码了。