欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Swoole Redis 连接池的实现

程序员文章站 2023-11-10 22:03:58
这篇文章仅仅只实现一个 Redis 连接池,篇幅就太少了,顺便将前几篇整合一下。Demo 中大概包含这些点: 实现 MySQL 连接池 实现 MySQL CURD 方法的定义 实现 Redis 连接池 实现 Redis 方法的定义 满足 HTTP、TCP、WebSocket 调用 提供 Demo 供 ......

这篇文章仅仅只实现一个 redis 连接池,篇幅就太少了,顺便将前几篇整合一下。

demo 中大概包含这些点:

    实现 mysql 连接池

    实现 mysql curd 方法的定义

    实现 redis 连接池

    实现 redis 方法的定义

    满足 http、tcp、websocket 调用

    提供 demo 供测试

    调整 目录结构

http 调用:

    实现 读取 mysql 中数据的 demo

    实现 读取 redis 中数据的 demo

640?wx_fmt=png

tcp 调用:

    实现 读取 mysql 中数据的 demo

    实现 读取 redis 中数据的 demo

640?wx_fmt=png

websocket 调用:

    实现 每秒展示 api 调用量 demo

640?wx_fmt=gif
目录结构

    ├─ client    
    │  ├─ http    
    │     ├── mysql.php //测试 mysql 连接    
    │     ├── redis.php //测试 redis 连接    
    │  ├─ tcp    
    │     ├── mysql.php //测试 mysql 连接    
    │     ├── redis.php //测试 redis 连接    
    │  ├─ websocket    
    │     ├── index.html //实现 api 调用量展示    
    ├─ controller    
    │  ├─ order.php     //实现 mysql curd    
    │  ├─ product.php   //实现 redis 调用    
    │  ├─ statistic.php //模拟 api 调用数据    
    ├─ server    
    │  ├─ config    
    │     ├── config.php //默认配置    
    │     ├── mysql.php  //mysql 配置    
    │     ├── redis.php  //redis 配置    
    │  ├─ core    
    │     ├── common.php //公共方法    
    │     ├── core.php   //核心文件    
    │     ├── handlerexception.php //异常处理    
    │     ├── callback //回调处理    
    │         ├── onrequest.php    
    │         ├── onreceive.php    
    │         ├── ontask.php    
    │         ├── ...    
    │     ├── mysql    
    │         ├── mysqldb.php    
    │         ├── mysqlpool.php    
    │     ├── redis    
    │         ├── redisdb.php    
    │         ├── redispool.php    
    │  ├─ log  -- 需要 读/写 权限    
    │     ├── ...    
    ├─ index.php //入口文件

代码

server/core/redis/redispool.php

    <?php    
    if (!defined('server_path')) exit("no access");    
    class redispool    
    {    
        private static $instance;    
        private $pool;    
        private $config;    
        public static function getinstance($config = null)    
        {    
            if (empty(self::$instance)) {    
                if (empty($config)) {    
                    throw new runtimeexception("redis config empty");    
                }    
                self::$instance = new static($config);    
            }    
            return self::$instance;    
        }    
        public function __construct($config)    
        {    
            if (empty($this->pool)) {    
                $this->config = $config;    
                $this->pool = new chan($config['master']['pool_size']);    
                for ($i = 0; $i < $config['master']['pool_size']; $i++) {    
                    go(function() use ($config) {    
                        $redis = new redisdb();    
                        $res = $redis->connect($config);    
                        if ($res === false) {    
                            throw new runtimeexception("failed to connect redis server");    
                        } else {    
                            $this->pool->push($redis);    
                        }    
                    });    
                }    
            }    
        }    
        public function get()    
        {    
            if ($this->pool->length() > 0) {    
                $redis = $this->pool->pop($this->config['master']['pool_get_timeout']);    
                if (false === $redis) {    
                    throw new runtimeexception("pop redis timeout");    
                }    
                defer(function () use ($redis) { //释放    
                    $this->pool->push($redis);    
                });    
                return $redis;    
            } else {    
                throw new runtimeexception("pool length <= 0");    
            }    
        }    
    }

 



server/core/redis/redisdb.php

    <?php    
    if (!defined('server_path')) exit("no access");    
    class redisdb    
    {    
        private $master;    
        private $slave;    
        private $config;    
        public function __call($name, $arguments)    
        {    
            // todo 主库的操作    
            $command_master = ['set', 'hset', 'sadd'];    
            if (!in_array($name, $command_master)) {    
                $db = $this->_get_usable_db('slave');    
            } else {    
                $db = $this->_get_usable_db('master');    
            }    
            $result = call_user_func_array([$db, $name], $arguments);    
            return $result;    
        }    
        public function connect($config)    
        {    
            //主库    
            $master = new swoole\coroutine\redis();    
            $res = $master->connect($config['master']['host'], $config['master']['port']);    
            if ($res === false) {    
                throw new runtimeexception($master->errcode, $master->errmsg);    
            } else {    
                $this->master = $master;    
            }    
            //从库    
            $slave = new swoole\coroutine\redis();    
            $res = $slave->connect($config['slave']['host'], $config['slave']['port']);    
            if ($res === false) {    
                throw new runtimeexception($slave->errcode, $slave->errmsg);    
            } else {    
                $this->slave = $slave;    
            }    
            $this->config = $config;    
            return $res;    
        }    
        private function _get_usable_db($type)    
        {    
            if ($type == 'master') {    
                if (!$this->master->connected) {    
                    $master = new swoole\coroutine\redis();    
                    $res = $master->connect($this->config['master']['host'], $this->config['master']['port']);    
                    if ($res === false) {    
                        throw new runtimeexception($master->errcode, $master->errmsg);    
                    } else {    
                        $this->master = $master;    
                    }    
                }    
                return $this->master;    
            } elseif ($type == 'slave') {    
                if (!$this->slave->connected) {    
                    $slave = new swoole\coroutine\redis();    
                    $res = $slave->connect($this->config['slave']['host'], $this->config['slave']['port']);    
                    if ($res === false) {    
                        throw new runtimeexception($slave->errcode, $slave->errmsg);    
                    } else {    
                        $this->slave = $slave;    
                    }    
                }    
                return $this->slave;    
            }    
        }    
    }

 



client/http/redis.php

    <?php    
    $demo = [    
        'type'  => 'sw',    
        'token' => 'bb1r3ylipbktp5p0',    
        'param' => [    
            'class'  => 'product',    
            'method' => 'set',    
            'param' => [    
                'key'   => 'c4649',    
                'value' => '订单-c4649'    
            ],    
        ],    
    ];    
    $ch = curl_init();    
    $options = [    
        curlopt_url  => 'http://10.211.55.4:9509/',    
        curlopt_post => 1,    
        curlopt_postfields => json_encode($demo),    
    ];    
    curl_setopt_array($ch, $options);    
    curl_exec($ch);    
    curl_close($ch);

 

 

client/tpc/redis.php

    <?php    
    class client    
    {    
        private $client;    
        public function __construct() {    
            $this->client = new swoole_client(swoole_sock_tcp, swoole_sock_async);    
            $this->client->on('connect', [$this, 'onconnect']);    
            $this->client->on('receive', [$this, 'onreceive']);    
            $this->client->on('close', [$this, 'onclose']);    
            $this->client->on('error', [$this, 'onerror']);    
        }    
        public function connect() {    
            if(!$fp = $this->client->connect("0.0.0.0", 9510, 1)) {    
                echo "error: {$fp->errmsg}[{$fp->errcode}]".php_eol;    
                return;    
            }    
        }    
        public function onconnect() {    
            fwrite(stdout, "测试rpc (y or n):");    
            swoole_event_add(stdin, function() {    
                $msg = trim(fgets(stdin));    
                if ($msg == 'y') {    
                    $this->send();    
                }    
                fwrite(stdout, "测试rpc (y or n):");    
            });    
        }    
        public function onreceive($cli, $data) {    
            echo '[received]:'.$data;    
        }    
        public function send() {    
            $demo = [    
                'type'  => 'sw',    
                'token' => 'bb1r3ylipbktp5p0',    
                'param' => [    
                    'class'  => 'product',    
                    'method' => 'get',    
                    'param' => [    
                        'code' => 'c4649'    
                    ],    
                ],    
            ];    
            $this->client->send(json_encode($demo));    
        }    
        public function onclose() {    
            echo "client close connection".php_eol;    
        }    
        public function onerror() {    
        }    
    }    
    $client = new client();    
    $client->connect();

 



client/websocket/index.html

    <!doctype html>    
    <html>    
    <head>    
        <meta charset="utf-8">    
        <meta http-equiv="x-ua-compatible" content="ie=edge">    
        <meta name="viewport" content="width=device-width, initial-scale=1">    
        <meta name="description" content="">    
        <meta name="keywords" content="">    
        <title>demo</title>    
        <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.js"></script>    
        <script src="http://echarts.baidu.com/gallery/vendors/echarts/echarts.min.js"></script>    
    </head>    
    <body>    
    <!-- 为echarts准备一个具备大小(宽高)的dom -->    
    <div id="main" style="width: 900px;height:400px;"></div>    
    <script type="text/javascript">    
        if ("websocket" in window) {    
            // 基于准备好的dom,初始化echarts实例    
            var mychart = echarts.init(document.getelementbyid('main'));    
            var wsserver = 'ws://10.211.55.4:9509';    
            var ws = new websocket(wsserver);    
            ws.onopen = function (evt) {    
                if (ws.readystate == 1) {    
                    console.log('websocket 连接成功...');    
                } else {    
                    console.log('websocket 连接失败...');    
                }    
                if (ws.readystate == 1) {    
                    ws.send('开始请求...');    
                } else {    
                    alert('websocket 连接失败');    
                }    
            };    
            ws.onmessage = function (evt) {    
                console.log('retrieved data from server: ' + evt.data);    
                var evt_data = jquery.parsejson(evt.data);    
                mychart.setoption({    
                    xaxis: {    
                        data : evt_data.time    
                    },    
                    series: [{    
                        data: evt_data.value    
                    }]    
                });    
            };    
            ws.onerror = function (evt) {    
                alert('websocket 发生错误');    
                console.log(evt);    
            };    
            ws.onclose = function() {    
                alert('websocket 连接关闭');    
                console.log('websocket 连接关闭...');    
            };    
            // 指定图表的配置项和数据    
            $.ajax({    
                url      : 'http://10.211.55.4:9509/', // 请求url    
                type     : "post", // 提交方式    
                datatype : "json", // 数据类型    
                data : {    
                    'type'  : 'sw',    
                    'token' : 'bb1r3ylipbktp5p0',    
                    'param' : {    
                        'class'  : 'statistic',    
                        'method' : 'init'    
                    }    
                },    
                beforesend:function() {    
                },    
                success : function(rs) {    
                    if (rs.code != 1) {    
                        alert('获取数据失败');    
                    } else {    
                        var option = {    
                            title: {    
                                text: 'api 调用量',    
                                x:'center'    
                            },    
                            tooltip: {    
                                trigger: 'axis',    
                                axispointer: {    
                                    animation: false    
                                }    
                            },    
                            xaxis: {    
                                type : 'category',    
                                data : rs.data.time    
                            },    
                            yaxis: {    
                                type: 'value',    
                                boundarygap: [0, '100%'],    
                                name: '使用量',    
                                splitline: {    
                                    show: false    
                                }    
                            },    
                            series: [{    
                                name: '使用量',    
                                type: 'line',    
                                showsymbol: false,    
                                hoveranimation: false,    
                                data: rs.data.value    
                            }]    
                        };    
                        // 使用刚指定的配置项和数据显示图表。    
                        if (option && typeof option === "object") {    
                            mychart.setoption(option, true);    
                        }    
                    }    
                },    
                error : function(){    
                    alert('服务器请求异常');    
                }    
            });    
        } else {    
            alert("您的浏览器不支持 websocket!");    
        }    
    </script>    
    </body>    
    </html>

 


还涉及到,onmessage.php、ontask.php 、onworkerstart.php 等,就不贴代码了。