使用NServiceBus开发分布式应用
程序员文章站
2022-06-20 14:49:57
前言 NServiceBus是.Net平台下的开源的消息服务框架,已经支持.Net Core。目前稳定版本7.1。企业开发需要购买License,开发者可在线下载开发者License。 官方网站: "https://particular.net/" 官方示例: "https://docs.parti ......
前言
nservicebus是.net平台下的开源的消息服务框架,已经支持.net core。目前稳定版本7.1。企业开发需要购买license,开发者可在线下载开发者license。
官方网站:
官方示例:
nservicebus入门
如图所示,项目一共包括4个端点(endpoint),也就是四个单独的项目,端点是nservicebus中的核心概念,发送消息和事件发布订阅的基础都是endpoint。这个项目中包括发送消息和事件的发布订阅。
完整的项目结构如图所示:
clientui
class program { private static ilog log = logmanager.getlogger<program>(); static void main(string[] args) { mainasync().getawaiter().getresult(); } static async task runasync(iendpointinstance endpointinstance) { log.info("press 'p' to place an order,press 'q' to quit"); while (true) { var key = console.readkey(); console.writeline(); switch (key.key) { case consolekey.p: { var command = new placeorder { orderid = guid.newguid().tostring() }; log.info($"sending placeorder with orderid:{command.orderid}"); //发送到sales端点 await endpointinstance.send("sales",command).configureawait(false); break; } case consolekey.q: return; default: log.info("please try again"); break; } } } static async task mainasync() { console.title = "client-ui"; var config = new endpointconfiguration("clientui");//设置端点名称 config.usetransport<learningtransport>(); //设置消息管道模式,learningtransport仅仅用来学习,生产慎用 config.usepersistence<learningpersistence>();//持久化 var endpointinstance =await endpoint.start(config).configureawait(false); await runasync(endpointinstance).configureawait(false); //runasync返回的是task,所以这里使用configureawait() await endpointinstance.stop().configureawait(false); } }
sales
class program { static async task main(string[] args) { console.title = "sales"; var config = new endpointconfiguration("sales"); config.usetransport<learningtransport>(); config.usepersistence<learningpersistence>(); var endpointinstance = await endpoint.start(config).configureawait(false); console.writeline("press enter to quit..."); console.readline(); await endpointinstance.stop().configureawait(false); } } public class placeorderhandler:ihandlemessages<placeorder> { private static ilog log = logmanager.getlogger<placeorderhandler>(); public task handle(placeorder message, imessagehandlercontext context) { //接受端点消息 log.info($"received placeorder ,orderid:{message.orderid}"); //发布orderplaced事件 var order=new orderplaced(); order.orderid = message.orderid; return context.publish(order); } }
billing
static async task main(string[] args) { console.title = "sales"; var config = new endpointconfiguration("billing"); config.usetransport<learningtransport>(); config.usepersistence<learningpersistence>(); var endpointinstance = await endpoint.start(config).configureawait(false); console.writeline("press enter to quit..."); console.readline(); await endpointinstance.stop().configureawait(false); } public class orderplacedhandler:ihandlemessages<orderplaced> { private static ilog log = logmanager.getlogger<orderplacedhandler>(); public task handle(orderplaced message, imessagehandlercontext context) { //订阅orderplaced事件 log.info($"received orderplaced,orderid {message.orderid} - charging credit card"); //发布orderbilled事件 var order=new orderbilled(); order.orderid = message.orderid; return context.publish(order); } }
shipping
static async task main(string[] args) { console.title = "sales"; var config = new endpointconfiguration("shipping"); config.usetransport<learningtransport>(); config.usepersistence<learningpersistence>(); var endpointinstance = await endpoint.start(config).configureawait(false); console.writeline("press enter to quit..."); console.readline(); await endpointinstance.stop().configureawait(false); } public class orderbilledhandler:ihandlemessages<orderbilled> { private static ilog log = logmanager.getlogger<orderbilledhandler>(); //处理orderbilled订阅事件 public task handle(orderbilled message, imessagehandlercontext context) { log.info($"received orderbilled,orderid={message.orderid} should we ship now?"); return task.completedtask; } } public class orderplacedhandler:ihandlemessages<orderplaced> { private static ilog log = logmanager.getlogger<orderplacedhandler>(); //处理orderplaced订阅事件 public task handle(orderplaced message, imessagehandlercontext context) { log.info($"received orderplaced,orderid={message.orderid} should we ship now?"); return task.completedtask; } }
运行结果
总结
nservicebus的核心是在端点之间通信,通信的实体需要实现icommand接口,通信的事件需要实现ievent事件,nservicebus会扫描实现这两个接口的类。每个端点之间的关键配置就是endpointconfiguration。
推荐阅读