欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用NServiceBus开发分布式应用

程序员文章站 2022-06-20 14:49:57
前言 NServiceBus是.Net平台下的开源的消息服务框架,已经支持.Net Core。目前稳定版本7.1。企业开发需要购买License,开发者可在线下载开发者License。 官方网站: "https://particular.net/" 官方示例: "https://docs.parti ......

前言

nservicebus是.net平台下的开源的消息服务框架,已经支持.net core。目前稳定版本7.1。企业开发需要购买license,开发者可在线下载开发者license。
官方网站:
官方示例:

nservicebus入门

使用NServiceBus开发分布式应用
如图所示,项目一共包括4个端点(endpoint),也就是四个单独的项目,端点是nservicebus中的核心概念,发送消息和事件发布订阅的基础都是endpoint。这个项目中包括发送消息和事件的发布订阅。

完整的项目结构如图所示:
使用NServiceBus开发分布式应用

clientui

class program
    {
        private static ilog log = logmanager.getlogger<program>();
        static void main(string[] args)
        {
            mainasync().getawaiter().getresult();
        }


        static async task runasync(iendpointinstance endpointinstance)
        {
            log.info("press 'p' to place an order,press 'q' to quit");
            
            while (true)
            {
                
                var key = console.readkey();
                console.writeline();

                switch (key.key)
                {
                    case consolekey.p:
                    {
                        var command = new placeorder
                        {
                            orderid = guid.newguid().tostring()
                        };
                        
                        log.info($"sending placeorder with orderid:{command.orderid}");
                        //发送到sales端点
                        await endpointinstance.send("sales",command).configureawait(false);
                        break;
                    }
                    
                    case consolekey.q:
                        return;
                    default:
                        log.info("please try again");
                        break;
                }
                
            }
        }
        
        static async task mainasync()
        {
            console.title = "client-ui";
            var config = new endpointconfiguration("clientui");//设置端点名称
            config.usetransport<learningtransport>(); //设置消息管道模式,learningtransport仅仅用来学习,生产慎用
            config.usepersistence<learningpersistence>();//持久化

            var endpointinstance =await endpoint.start(config).configureawait(false);

            await runasync(endpointinstance).configureawait(false); //runasync返回的是task,所以这里使用configureawait()
            
            await endpointinstance.stop().configureawait(false);

        }
    }

sales

class program
{
    static async task main(string[] args)
    {
        console.title = "sales";

        var config = new endpointconfiguration("sales");
        config.usetransport<learningtransport>();
        config.usepersistence<learningpersistence>();

        var endpointinstance = await endpoint.start(config).configureawait(false);

        console.writeline("press enter to quit...");
        console.readline();

        await endpointinstance.stop().configureawait(false);

    }
}

public class placeorderhandler:ihandlemessages<placeorder>
{
   private static ilog log = logmanager.getlogger<placeorderhandler>();
   public task handle(placeorder message, imessagehandlercontext context)
   {
      //接受端点消息
      log.info($"received placeorder ,orderid:{message.orderid}");

      //发布orderplaced事件
      var order=new orderplaced();
      order.orderid = message.orderid;

      return context.publish(order);
   }
}

billing

static async task main(string[] args)
{
    console.title = "sales";

    var config = new endpointconfiguration("billing");
    config.usetransport<learningtransport>();
    config.usepersistence<learningpersistence>();

    var endpointinstance = await endpoint.start(config).configureawait(false);

    console.writeline("press enter to quit...");
    console.readline();

    await endpointinstance.stop().configureawait(false);
}

 public class orderplacedhandler:ihandlemessages<orderplaced>
 {
    private static ilog log = logmanager.getlogger<orderplacedhandler>();
         
    public task handle(orderplaced message, imessagehandlercontext context)
    {
       //订阅orderplaced事件
       log.info($"received orderplaced,orderid {message.orderid} - charging credit card"); 
            
       //发布orderbilled事件
       var order=new orderbilled();
       order.orderid = message.orderid;
       return context.publish(order);
     }
}

shipping

static async task main(string[] args)
{
    console.title = "sales";

    var config = new endpointconfiguration("shipping");
    config.usetransport<learningtransport>();
    config.usepersistence<learningpersistence>();

    var endpointinstance = await endpoint.start(config).configureawait(false);

    console.writeline("press enter to quit...");
    console.readline();

    await endpointinstance.stop().configureawait(false);
}

public class orderbilledhandler:ihandlemessages<orderbilled>
{
  private static ilog log = logmanager.getlogger<orderbilledhandler>();
  //处理orderbilled订阅事件
  public task handle(orderbilled message, imessagehandlercontext context)
  {
      log.info($"received orderbilled,orderid={message.orderid} should we ship now?");
      return task.completedtask;
  }
}

public class orderplacedhandler:ihandlemessages<orderplaced>
{
   private static ilog log = logmanager.getlogger<orderplacedhandler>();
   //处理orderplaced订阅事件
   public task handle(orderplaced message, imessagehandlercontext context)
   {
       log.info($"received orderplaced,orderid={message.orderid} should we ship now?");
       return task.completedtask;
   }
}
    

运行结果

使用NServiceBus开发分布式应用

使用NServiceBus开发分布式应用

使用NServiceBus开发分布式应用

使用NServiceBus开发分布式应用

总结

      nservicebus的核心是在端点之间通信,通信的实体需要实现icommand接口,通信的事件需要实现ievent事件,nservicebus会扫描实现这两个接口的类。每个端点之间的关键配置就是endpointconfiguration。