基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送
程序员文章站
2022-06-21 20:30:34
思路 利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。 WebSocket 服务 composer require hyperf/websocket-server 配置文件 [config/a ......
思路
利用 websocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。
websocket 服务
composer require hyperf/websocket-server
配置文件 [config/autoload/server.php]
<?php
return [
'mode' => swoole_process,
'servers' => [
[
'name' => 'http',
'type' => server::server_http,
'host' => '0.0.0.0',
'port' => 11111,
'sock_type' => swoole_sock_tcp,
'callbacks' => [
swooleevent::on_request => [hyperf\httpserver\server::class, 'onrequest'],
],
],
[
'name' => 'ws',
'type' => server::server_websocket,
'host' => '0.0.0.0',
'port' => 12222,
'sock_type' => swoole_sock_tcp,
'callbacks' => [
swooleevent::on_hand_shake => [hyperf\websocketserver\server::class, 'onhandshake'],
swooleevent::on_message => [hyperf\websocketserver\server::class, 'onmessage'],
swooleevent::on_close => [hyperf\websocketserver\server::class, 'onclose'],
],
],
],
websocket 服务器端代码示例
<?php
declare(strict_types=1);
/**
* this file is part of hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/license
*/
namespace app\controller;
use hyperf\contract\oncloseinterface;
use hyperf\contract\onmessageinterface;
use hyperf\contract\onopeninterface;
use swoole\http\request;
use swoole\server;
use swoole\websocket\frame;
use swoole\websocket\server as websocketserver;
class websocketcontroller extends controller implements onmessageinterface, onopeninterface, oncloseinterface
{
/**
* 发送消息
* @param websocketserver $server
* @param frame $frame
*/
public function onmessage(websocketserver $server, frame $frame): void
{
//心跳刷新缓存
$redis = $this->container->get(\redis::class);
//获取所有的客户端id
$fdlist = $redis->smembers('websocket_sjd_1');
//如果当前客户端在客户端集合中,就刷新
if (in_array($frame->fd, $fdlist)) {
$redis->sadd('websocket_sjd_1', $frame->fd);
$redis->expire('websocket_sjd_1', 7200);
}
$server->push($frame->fd, 'recv: ' . $frame->data);
}
/**
* 客户端失去链接
* @param server $server
* @param int $fd
* @param int $reactorid
*/
public function onclose(server $server, int $fd, int $reactorid): void
{
//删掉客户端id
$redis = $this->container->get(\redis::class);
//移除集合中指定的value
$redis->srem('websocket_sjd_1', $fd);
var_dump('closed');
}
/**
* 客户端链接
* @param websocketserver $server
* @param request $request
*/
public function onopen(websocketserver $server, request $request): void
{
//保存客户端id
$redis = $this->container->get(\redis::class);
$res1 = $redis->sadd('websocket_sjd_1', $request->fd);
var_dump($res1);
$res = $redis->expire('websocket_sjd_1', 7200);
var_dump($res);
$server->push($request->fd, 'opened');
}
}
websocket 前端代码
function websockettest() {
if ("websocket" in window) {
console.log("您的浏览器支持 websocket!");
var num = 0
// 打开一个 web socket
var ws = new websocket("ws://127.0.0.1:12222");
ws.onopen = function () {
// web socket 已连接上,使用 send() 方法发送数据
//alert("数据发送中...");
//ws.send("发送数据");
};
window.setinterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
var ping = {"type": "ping"};
ws.send(json.stringify(ping));
}, 5000);
ws.onmessage = function (evt) {
var d = json.parse(evt.data);
console.log(d);
if (d.code == 300) {
$(".address").text(d.address)
}
if (d.code == 200) {
var v = d.data
console.log(v);
num++
var str = `<div class="item">
<p>${v.recordouttime}</p>
<p>${v.useroutname}</p>
<p>${v.useroutnum}</p>
<p>${v.dooroutname}</p>
</div>`
$(".tablehead").after(str)
if (num > 7) {
num--
$(".table .item:nth-last-child(1)").remove()
}
}
};
ws.error = function (e) {
console.log(e)
alert(e)
}
ws.onclose = function () {
// 关闭 websocket
alert("连接已关闭...");
};
} else {
alert("您的浏览器不支持 websocket!");
}
}
amqp 组件
composer require hyperf/amqp
配置文件 [config/autoload/amqp.php]
<?php
return [
'default' => [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
],
'params' => [
'insist' => false,
'login_method' => 'amqplain',
'login_response' => null,
'locale' => 'en_us',
'connection_timeout' => 3.0,
'read_write_timeout' => 6.0,
'context' => null,
'keepalive' => false,
'heartbeat' => 3,
],
],
];
mq 消费者代码
<?php
declare(strict_types=1);
namespace app\amqp\consumer;
use hyperf\amqp\annotation\consumer;
use hyperf\amqp\message\consumermessage;
use hyperf\amqp\result;
use hyperf\server\server;
use hyperf\server\serverfactory;
/**
* @consumer(exchange="hyperf", routingkey="hyperf", queue="hyperf", nums=1)
*/
class democonsumer extends consumermessage
{
/**
* rabbmitmq消费端代码
* @param $data
* @return string
*/
public function consume($data): string
{
print_r($data);
//获取集合中所有的value
$redis = $this->container->get(\redis::class);
$fdlist=$redis->smembers('websocket_sjd_1');
$server=$this->container->get(serverfactory::class)->getserver()->getserver();
foreach($fdlist as $key=>$v){
if(!empty($v)){
$server->push((int)$v, $data);
}
}
return result::ack;
}
}
控制器代码
/**
* test
* @return array
*/
public function test()
{
$data = array(
'code' => 200,
'data' => [
'useroutname' => 'ccflow',
'useroutnum' => '9999',
'recordouttime' => date("y-m-d h:i:s", time()),
'dooroutname' => '教师公寓',
]
);
$data = \guzzlehttp\json_encode($data);
$message = new demoproducer($data);
$producer = applicationcontext::getcontainer()->get(producer::class);
$result = $producer->produce($message);
var_dump($result);
$user = $this->request->input('user', 'hyperf');
$method = $this->request->getmethod();
return [
'method' => $method,
'message' => "{$user}.",
];
}
最终效果
上一篇: C#动态生成按钮及定义按钮事件的方法