nodejs redis 发布订阅机制封装实现方法及实例代码
程序员文章站
2023-11-01 19:32:10
nodejs redis 发布订阅机制封装
最近项目使用redis,对publish 和 subscribe的使用进行了了解,并进行了封装。 ...
nodejs redis 发布订阅机制封装
最近项目使用redis,对publish 和 subscribe的使用进行了了解,并进行了封装。
var config = require('../config/config'); var log = require("./loghelp"); var redis = require("redis"); function initialclient(param) { var option={ host: config.redis.host, port: config.redis.port}; if(param) { option=object.assign(option,param); } redis.print let client = redis.createclient(option); client.on("error", function(err) { log.error(err); }); return client; }
/*example: * let channel="ryan"; redis.pubsub.registerhandlers("ryan",msg=> console.log(msg)); redis.pubsub.subscribe(channel); redis.pubsub.publish(channel,"hello from chen");*/ class pubsub { constructor(){ this.sub=initialclient(); this.handlers=new map(); this.subaction=(channle,message)=>{ let actions= this.handlers.get(channle)||new set(); for(let action of actions) { action(message); } } this.alredypublishs=[]; this.subconnected=false; } publish(channel,message) { let action=()=>{ let pub=initialclient(); pub.publish(channel,message); }; if(this.subconnected===false) { this.alredypublishs.push(action); } else action(); } registerhandlers(channel,action) { var actions=this.handlers.get(channel)||new set(); actions.add(action); this.handlers.set(channel,actions); } subscribe(channel) { let self=this; this.sub.subscribe(channel,function (err,reply) { if(err) log.error(err); self.subconnected=true; for(let publish of self.alredypublishs) publish(); console.log(reply); }); this.sub.on("message", function (channel, message) { self.subaction(channel,message); }); } teardown() { this.sub.quit(); } }
然后通过exports.pubsub=new pubsub() 将其暴漏,可保证是单例。在程序启动时,调用
registerhandlers 注册特定通道的处理逻辑,然后调用
subscribe 订阅通道。
在合适时机调用publish,这个机制可以实现分布式下所有客户端watch 同一个数据的更改。
本人全手工打造的dotnetcore webapi 框架,可实现快速开发。
地址:http://xiazai.jb51.net/201612/yuanma/webapicore-master(jb51.net).rar。
1 采用ddd模式开发,充血模型 2 添加dapper扩展,默认实现增删改查基本操作。利用automapper 做实体转换,减少重复劳动。 3 依赖注入融合autofac,仓储层和应用层自动注入 4 实现jwt验证 5 加入swagger 文档 6 单元测试添加了xunit,mymvc 可以方便对webapi测试 7 数据库版本控制
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!