欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

node.js中TCP Socket多进程间的消息推送示例详解

程序员文章站 2023-11-11 21:23:40
前言 前段时间接到了一个支付中转服务的需求,即支付数据通过http接口传到中转服务器,中转服务器将支付数据发送到异构后台(lua)的指定tcp socket。 一开...

前言

前段时间接到了一个支付中转服务的需求,即支付数据通过http接口传到中转服务器,中转服务器将支付数据发送到异构后台(lua)的指定tcp socket。

node.js中TCP Socket多进程间的消息推送示例详解

一开始评估的时候感觉蛮简单的,就是http server和tcp server间的通信,不是一个event实例就能解决的状态管理问题吗?注册一个事件a用于消息传递,在socket连接时注册唯一的id,然后在http接收到数据时,emit事件a;在监听到事件a时,在tcp server中寻找指定id对应的socket处理该数据即可。

尽管node.js在高并发方面有不错的性能,但是单个tcp server实例的承载能力有限,为避免服务器过载,node.js 单进程的内存有上限(默认2g),能容纳的长连接客户端数不多。但随着业务的扩大,我们需要考虑多机集群部署,客户端可以连接到任一节点,并发送消息。如何做到多节点的同时推送,我们需要建立一套多节点之间的消息分发/订阅架构。常用的第三方消息管理库有 rabbitmq和redis等。在这里,我用的是redis的订阅发布服务。

redis.io有一个比较成熟的redis消息中转库 ()。但我们项目中异构后台用到的并非websocket,而是原生的tcp原生的socket。用原生redis的sub/pubs实现并不难,就手写了。

redis在该项目中主要起到一个消息分发中心(publish/subscribe)的作用。当http请求的支付数据发送过来时,则通过redis的publish功能往所有的channel推送消息,这样所有订阅该channel的socket server就能收到回调,然后推送到指定客户端。在应用层看跟event事件消息的处理差不多。

const redis = require("redis"),
 redisclient = redis.createclient,
 redis_cfg = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisclient(redis_cfg),
 pub = redisclient(redis_cfg),
 pay_mq_channel = 'pay_mq_channel';

// 监听频道的消息回调
sub.on('message', function(channel, message) {
 switch (channle){
  case pay_mq_channel:
   console.log('notification received:', message);

   // 广播消息到指定socket

   break;
 }
});
// 订阅频道
sub.subscribe(pay_mq_channel);

// 当接收到支付数据时,推送频道消息
pub.publish(pay_mq_channel, {id: '01', msg: `hello ${pay_mq_channel}!`});

由于redis的sub/pub的channel订阅数有上限,所以建议一类消息使用一个channel,一个channel下使用map、set或数组来存储订阅时的回调函数,在接收到订阅消息时遍历执行回调函数。

下面是我封装好的redis组件(redismqproxy.js):

/*
 * redis 订阅/发布
 */
const _ = require('lodash'),
 redis = require("redis"),
 redis_cfg = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisclient(redis_cfg),
 pub = redisclient(redis_cfg);

let sublistenerfuns = {}; // channel的回调函数列表

let redismqproxy = {

 // 订阅channel
 on(channel, cb, errorcb, once = false) {
  sub.subscribe(channel); // 订阅channel消息

  // 将回调函数存放数组中
  sublistenerfuns[channel] = _.isempty( sublistenerfuns[channel] ) ? [] : sublistenerfuns[channel];
  sublistenerfuns[channel].push({
   once, cb, errorcb
  });
 },

 // 监听一次性的channel回调函数
 once(channel, cb, errorcb) {
  this.on(channel, cb, errorcb, true);
 },

 // 发送channel消息
 emit(channel, message) {
  if(!_.isstring(message)) {
   message = json.stringify(message);
  }
  pub.publish(channel, message);
 },

 // 移除channel上的监听函数
 removelistener(channel, func) {
  let channelhandlers = _.isempty( sublistenerfuns[channel] ) ? [] : sublistenerfuns[channel];
  for(let i = 0, l = channelhandlers.length; i < l; i++) {
   let handler = channelhandlers[i] || {};
   let cb = handler.cb;
   if(func && func == cb) {
    channelhandlers.splice(i, 1);
    return false;
   }
  }
 }
};

redismqproxy.sublisteners = sublistenerfuns;

pub.on('error', onerror);
sub.on('error', onerror);

// 监听redis的订阅消息
sub.on("message", function(channel, message) {
 // 遍历执行channel的回调函数
 try {
  message = json.parse(message);
 } catch(e) {}
 broadcasttochannel(channel, message);
});

// 广播消息到指定频道
function broadcasttochannel(channel, message, iserror) {
 let channelhandlers = _.isempty( sublistenerfuns[channel] ) ? [] : sublistenerfuns[channel];
 for(let i = 0, l = channelhandlers.length; i < l; i++) {
  let handler = channelhandlers[i] || {};
  let isonce = handler.once || false;
  let func = handler.cb;
  let errorfunc = handler.errorcb;

  _.isfunction(func) && func(message);
  iserror && _.isfunction(errorfunc) && errorfunc(message);

  isonce && channelhandlers.splice(i, 1); // 移除一次性监听的函数
 }
}

function broadcasttoallchannels(message, iserror) {
 for(let channel in sublistenerfuns) {
  broadcasttochannel(channel, message, iserror);
 }
}

function onerror(err) {
 err = err || {};
 err.msg = err.msg || 'redis sub/pub fail';

 // 通知所有channel执行错误回调函数
 broadcasttoallchannels(err, true);
}

module.exports = redismqproxy;

在使用时就可以比较方便地调用了:

const redismqproxy = require('./redismqproxy'),
 pay_mq_channel = 'pay_mq_channel';

// 订阅channel
redismq.on(pay_mq_channel, function(message) {
 console.log('notification received:', message);
 // 广播消息到指定socket
 // ...
});

// 订阅一次性的channel
redismq.once(pay_mq_channel, function(message) {
 // ...
});

// 当接收到支付数据时,推送频道消息
redismq.emit(pay_mq_channel, {id: '01', msg: `hello ${pay_mq_channel}!`});

目前该项目已经健康运行了一个多月。由于socket server的多进程间消息推送依赖于redis的消息中转,而redis使用的是单进程,未能充分利用cpu。当业务膨胀的时候,redis就要考虑分布集群了。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。