欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送

程序员文章站 2022-06-21 20:30:34
思路 利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。 WebSocket 服务 composer require hyperf/websocket-server 配置文件 [config/a ......

思路

利用 websocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

 

websocket 服务

composer require hyperf/websocket-server

  

配置文件 [config/autoload/server.php]

<?php

return [
    'mode' => swoole_process,
    'servers' => [
        [
            'name' => 'http',
            'type' => server::server_http,
            'host' => '0.0.0.0',
            'port' => 11111,
            'sock_type' => swoole_sock_tcp,
            'callbacks' => [
                swooleevent::on_request => [hyperf\httpserver\server::class, 'onrequest'],
            ],
        ],
        [
            'name' => 'ws',
            'type' => server::server_websocket,
            'host' => '0.0.0.0',
            'port' => 12222,
            'sock_type' => swoole_sock_tcp,
            'callbacks' => [
                swooleevent::on_hand_shake => [hyperf\websocketserver\server::class, 'onhandshake'],
                swooleevent::on_message => [hyperf\websocketserver\server::class, 'onmessage'],
                swooleevent::on_close => [hyperf\websocketserver\server::class, 'onclose'],
            ],
        ],
    ],

  

websocket 服务器端代码示例

<?php

declare(strict_types=1);
/**
 * this file is part of hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/license
 */

namespace app\controller;

use hyperf\contract\oncloseinterface;
use hyperf\contract\onmessageinterface;
use hyperf\contract\onopeninterface;
use swoole\http\request;
use swoole\server;
use swoole\websocket\frame;
use swoole\websocket\server as websocketserver;

class websocketcontroller extends controller implements onmessageinterface, onopeninterface, oncloseinterface
{

    /**
     * 发送消息
     * @param websocketserver $server
     * @param frame $frame
     */
    public function onmessage(websocketserver $server, frame $frame): void
    {
        //心跳刷新缓存
        $redis = $this->container->get(\redis::class);
        //获取所有的客户端id
        $fdlist = $redis->smembers('websocket_sjd_1');
        //如果当前客户端在客户端集合中,就刷新
        if (in_array($frame->fd, $fdlist)) {
            $redis->sadd('websocket_sjd_1', $frame->fd);
            $redis->expire('websocket_sjd_1', 7200);
        }
        $server->push($frame->fd, 'recv: ' . $frame->data);

    }

    /**
     * 客户端失去链接
     * @param server $server
     * @param int $fd
     * @param int $reactorid
     */
    public function onclose(server $server, int $fd, int $reactorid): void
    {
        //删掉客户端id
        $redis = $this->container->get(\redis::class);
        //移除集合中指定的value
        $redis->srem('websocket_sjd_1', $fd);
        var_dump('closed');

    }

    /**
     * 客户端链接
     * @param websocketserver $server
     * @param request $request
     */
    public function onopen(websocketserver $server, request $request): void
    {
        //保存客户端id
        $redis = $this->container->get(\redis::class);

        $res1 = $redis->sadd('websocket_sjd_1', $request->fd);
        var_dump($res1);

        $res = $redis->expire('websocket_sjd_1', 7200);
        var_dump($res);

        $server->push($request->fd, 'opened');

    }
}

  

websocket 前端代码

function websockettest() {
 if ("websocket" in window) {
            console.log("您的浏览器支持 websocket!");
            var num = 0

            // 打开一个 web socket
            var ws = new websocket("ws://127.0.0.1:12222");

            ws.onopen = function () {
                // web socket 已连接上,使用 send() 方法发送数据
                //alert("数据发送中...");
                //ws.send("发送数据");
            };

            window.setinterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
                var ping = {"type": "ping"};
                ws.send(json.stringify(ping));
            }, 5000);

            ws.onmessage = function (evt) {
                var d = json.parse(evt.data);
                console.log(d);
                if (d.code == 300) {
                    $(".address").text(d.address)
                }
                if (d.code == 200) {
                    var v = d.data
                    console.log(v);
                    num++
                    var str = `<div class="item">
                                    <p>${v.recordouttime}</p>
                                    <p>${v.useroutname}</p>
                                    <p>${v.useroutnum}</p>
                                    <p>${v.dooroutname}</p>
                                </div>`
                    $(".tablehead").after(str)
                    if (num > 7) {
                        num--
                        $(".table .item:nth-last-child(1)").remove()
                    }
                }
            };

            ws.error = function (e) {
                console.log(e)
                alert(e)
            }
            ws.onclose = function () {
                // 关闭 websocket
                alert("连接已关闭...");
            };
        } else {
            alert("您的浏览器不支持 websocket!");
        }
    }

  

amqp 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

<?php
return [
    'default' => [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'pool' => [
            'min_connections' => 1,
            'max_connections' => 10,
            'connect_timeout' => 10.0,
            'wait_timeout' => 3.0,
            'heartbeat' => -1,
        ],
        'params' => [
            'insist' => false,
            'login_method' => 'amqplain',
            'login_response' => null,
            'locale' => 'en_us',
            'connection_timeout' => 3.0,
            'read_write_timeout' => 6.0,
            'context' => null,
            'keepalive' => false,
            'heartbeat' => 3,
        ],
    ],
];

  

mq 消费者代码

<?php
declare(strict_types=1);

namespace app\amqp\consumer;

use hyperf\amqp\annotation\consumer;
use hyperf\amqp\message\consumermessage;
use hyperf\amqp\result;
use hyperf\server\server;
use hyperf\server\serverfactory;

/**
 * @consumer(exchange="hyperf", routingkey="hyperf", queue="hyperf", nums=1)
 */
class democonsumer extends consumermessage
{
    /**
     * rabbmitmq消费端代码
     * @param $data
     * @return string
     */
    public function consume($data): string
    {
        print_r($data);

        //获取集合中所有的value
        $redis = $this->container->get(\redis::class);
        $fdlist=$redis->smembers('websocket_sjd_1');

        $server=$this->container->get(serverfactory::class)->getserver()->getserver();
        foreach($fdlist as $key=>$v){
            if(!empty($v)){
                $server->push((int)$v, $data);
            }
        }

        return result::ack;
    }

  

} 

控制器代码

/**
    * test
     * @return array
     */
    public function test()
    {
        $data = array(
            'code' => 200,
            'data' => [
                'useroutname' => 'ccflow',
                'useroutnum' => '9999',
                'recordouttime' => date("y-m-d h:i:s", time()),
                'dooroutname' => '教师公寓',
            ]
        );
        $data = \guzzlehttp\json_encode($data);
        $message = new demoproducer($data);
        $producer = applicationcontext::getcontainer()->get(producer::class);
        $result = $producer->produce($message);
        var_dump($result);

        $user = $this->request->input('user', 'hyperf');
        $method = $this->request->getmethod();

        return [
            'method' => $method,
            'message' => "{$user}.",
        ];
    }

  

最终效果

 

 

基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送