欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Axon Framework官方文档(九)

程序员文章站 2022-05-09 13:49:13
...

9. Command Dispatching

显式命令分发机制的使用有很多优点。首先,有一个对象可以清楚地描述客户机的意图。通过记录该命令,您可以将意图和相关数据存储起来以备将来使用。命令处理还可以很容易地将您的命令处理组件公开到远程客户端,例如通过web服务。测试也变得更加容易,您可以通过定义初始情况(given)、命令执行(when)和预期的结果(expect)列出一系列事件和命令(参见测试)来定义测试脚本。最后一个主要优点是,在同步和异步以及本地和分布式命令处理之间切换非常容易。
这并不意味着使用显式命令对象是实现命令分派的惟一方法。Axon的目标不是指定一种特定的工作方式,而是支持您按照自己的方式进行工作,同时提供最佳实践作为默认行为。仍有可能使用一个服务层,你可以通过调用它来执行命令。方法将只需要启动一个Unit of Work(工作单元)(详情参考Unit Of Work)并在方法完成时执行提交或回滚。
下一节将概述与Axon框架建立命令分发的基础设施相关的任务。

9.1 The Command Gateway(命令网关)

命令网关是命令分发机制的一个方便接口。虽然您可能并不需要使用网关来发送命令,但其实这也是很容易的。
有两种方法可以使用命令网关。第一个是使用Axon提供的CommandGateway接口和DefaultCommandGateway实现。命令网关提供了许多方法,允许您发送命令,并以超时或异步方式等待结果。
另一种选择也许是最灵活。你可以使用CommandGatewayFactory把几乎任何一个接口变成一个命令网关。这允许你使用强类型定义你的应用程序的接口,并声明自己的(checked)业务异常。Axon将在运行时自动为该接口生成一个实现。

9.1.1 Configuring the Command Gateway(配置命令网关)

您的自定义的网关和由Axon提供的网关都至少需要配置可以访问到命令总线。此外,命令网关可以配置一个RetryScheduler 、CommandDispatchInterceptors和CommandCallbacks。
RetryScheduler能够在命令执行失败时调度重试。IntervalRetryScheduler就是它的一种实现,它将在指定的间隔内重试给定的命令,直到它成功,或者完成最大的重试次数。当一个命令由于明确非瞬态异常而失败,就不进行重试了。注意,重试调度器只在由于RuntimeException而失败时调用。受检异常被视为“业务异常”,不会触发重试。RetryScheduler的典型用法是在分布式命令总线上调度命令。如果一个节点失败了,重试调度程序将会将一个命令发送给下一个能够处理该命令的节点(请参阅Distributing the Command Bus,本节下面就讲)。
CommandDispatchInterceptors允许先于分发到命令总线之前进行CommandMessages 的修改。与配置在CommandBus上的CommandDispatchInterceptors相反,这些拦截器只是通过这个网关消息被发送时才调用。例如,可以使用拦截器附加元数据到一个命令或做验证。
每个发送了的命令都会调用CommandCallbacks。这就允许我们可以为通过这个网关所发送的命令做一些通用的行为,而不管所发送的命令的类型。

9.1.2 Creating a Custom Command Gateway(创建一个自定义的命令网关)

Axon允许使用自定义接口作为命令网关。接口中声明的每种方法的行为都基于参数类型,返回类型和声明的异常。使用此网关不仅方便,而且允许你在需要时模拟你的接口,从而使测试更加容易。
这就是参数如何影响CommandGateway的行为:
>第一个参数预计将是实际的命令对象
>使用@MetaDataValue注释的参数将把它们的值分配给元数据字段,其中标识符作为被注解的参数传递。
>MetaData类型的参数将与CommandMessage上的元数据合并。如果它们key相等,那么后面的参数定义的元数据将覆盖前面参数的元数据。
>CommandCallback类型的参数将在处理完命令后调用onSuccess或onFailure。您可以传入多个回调,它可能与返回值相结合。在这种情况下,回调函数的调用将总是与返回值(或exception)匹配。
>最后两个参数可以是long(或int)和TimeUnit。在这种情况下,该方法最多会阻塞如这两个参数所表明的时间,超了的话就属于超时了。该方法对于超时的响应取决于在方法上声明的异常(见下文)。注意,如果该方法的其他属性完全阻止了阻塞,超时将永远不会发生。

方法的声明返回值也会影响其行为:
>void:void返回类型将导致方法立即返回,除非在方法上有其他指示,例如超时或声明异常。
>future:返回值为Future、CompletionStage 、CompletableFuture 类型,将导致方法立即返回。您可以使用从方法返回的CompletableFuture实例访问命令处理程序的结果。在方法上声明的异常和超时被忽略。
>任何其他返回类型将导致该方法阻塞,直到结果可用为止。结果被转换为返回类型(如果类型不匹配,导致ClassCastException)。

异常有以下的影响:
>如果命令处理器(或拦截器)抛出该类型的异常,将抛出任何已声明的受检异常。如果受检异常尚未声明,它被包裹在一个CommandExecutionException中,这是一个RuntimeException。
>当发生超时时,默认行为是从方法返回null。这可以通过声明TimeoutException来改变。如果声明此异常,则抛出TimeoutException。
>当线程在等待结果时被中断时,默认行为是返回null。在这种情况下,中断的标志被设置回线程上。通过在方法上声明一个InterruptedException,该行为被更改为抛出异常。当抛出异常时,中断标志被删除,与java规范一致。
>其他运行时异常可能在方法上声明,但是不会对API用户有任何影响。


最后,有可能使用注解:
>在参数部分指定的参数中,参数的@MetaDataValue注解将具有该参数作为元数据值添加的值。将元数据项的键作为参数提供给注释。
>用@Timeout注解的方法最多会阻塞到指定的时间。如果方法声明超时参数,这个注解将被忽略
>用@Timeout注解的类,将导致这个类中声明的所有方法,最多阻塞到指定的时间。除非他们用自己的@Timeout注解或指定超时参数。

PS:译者住:
但凡是声明的方法不是以void或者Future作为返回值,或者是标识了@TimeOut注解,那么这个方法就是同步阻塞的。除此之外,即是异步的,对于方法的调用会立刻结束。
public interface MyGateway {

    // fire and forget
    void sendCommand(MyPayloadType command);

    // method that attaches meta data and will wait for a result for 10 seconds
    @Timeout(value = 10, unit = TimeUnit.SECONDS)
    ReturnValue sendCommandAndWaitForAResult(MyPayloadType command,
                                             @MetaDataValue("userId") String userId);

    // alternative that throws exceptions on timeout
    @Timeout(value = 20, unit = TimeUnit.SECONDS)
    ReturnValue sendCommandAndWaitForAResult(MyPayloadType command)
                         throws TimeoutException, InterruptedException;

    // this method will also wait, caller decides how long
    void sendCommandAndWait(MyPayloadType command, long timeout, TimeUnit unit)
                         throws TimeoutException, InterruptedException;
}

// To configure a gateway:
CommandGatewayFactory factory = new CommandGatewayFactory(commandBus);
// note that the commandBus can be obtained from the `Configuration` object returned on `configurer.initialize()`.
MyGateway myGateway = factory.createGateway(MyGateway.class);

9.2 The Command Bus(命令总线)

命令总线是将命令发送到各自的命令处理程序的机制。每个命令总是发送到一个命令处理程序。如果对于发出的命令没有可用的命令处理程序,那么就会抛出NoHandlerForCommandException异常。订阅同一个命令类型的多个命令处理程序将导致订阅替换。在这种情况下,最后一次订阅是成功的。

9.2.1Dispatching commands(分发命令)

CommandBus提供了两种方法将命令发送到它们各自的处理程序:dispatch(commandMessage,callback)和dispatch(commandMessage)。第一个参数是包含要分派的实际命令的消息。可选的第二个参数接受一个回调,它允许在命令处理完成时通知分发组件。这个回调有两种方法:onSuccess()和onFailure(),它们分别在命令处理返回时调用,或者当它分别抛出异常时调用。
调用组件可能不会假定回调是在发送命令的同一线程中调用的。如果调用线程在继续之前依赖于结果,则可以使用FutureCallback。它是一个Future(在java.concurrent包中定义)和Axon的CommandCallback的组合。或者,可以考虑使用一个命令网关。
如果应用程序没有直接对命令的结果感兴趣,则可以使用dispatch(commandMessage)方法。

9.2.2 SimpleCommandBus

顾名思义,SimpleCommandBus是一个简单的实现。它直接处理分派它们的线程中的命令。在处理了一个命令之后,修改后的聚合(s)将被保存,并在相同的线程中发布事件。在大多数情况下,例如web应用程序,这个实现将满足您的需要。SimpleCommandBus是配置API中默认使用的实现。
像大多数CommandBus实现一样,SimpleCommandBus允许配置拦截器。在命令总线上分发一个命令后调用CommandDispatchInterceptors。在实际的命令处理器方法之前调用CommandHandlerInterceptors,允许你修改或阻塞命令。有关更多信息,请参考命令处理器拦截器。
由于所有的命令处理都是在同一个线程中完成的,所以这个实现仅限于JVM的边界。这个实现的性能很好,但并不特别。要跨越JVM边界,或者从CPU周期中获得最大的功能,请查看其他CommandBus实现。

9.2.3 AsynchronousCommandBus

顾名思义,AsynchronousCommandBus实现从分发它们的线程异步执行命令。它使用一个Executor在不同的线程来执行实际的处理逻辑。
在默认情况下,AsynchronousCommandBus使用一个未绑定的缓存线程池。这意味着在分发命令时创建一个线程。已完成处理命令的线程将被重新使用以执行新的命令。如果60秒线程没有处理命令,则会停止线程。
或者,可以提供一个Executor实例来配置不同的线程策略。
注意,当停止应用程序时,应该关闭AsynchronousCommandBus,以确保任何等待的线程都被正确地关闭。要关闭,请调用shutdown()方法。这也将关闭任何提供的Executor实例,如果它实现了ExecutorService接口的话。

9.2.4 DisruptorCommandBus(分裂命令总线)

SimpleCommandBus具有合理的性能特征,特别是当您在性能调优中经历了性能提示时。SimpleCommandBus需要锁定,以防止多个线程同时访问相同的聚合,从而导致处理开销和锁争用。
DisruptorCommandBus采用了不同的多线程处理方法。并不是多个线程都做相同的处理工作,而是有多个线程,每个线程都处理进程的一部分。DisruptorCommandBus使用Disruptor(http://lmax-exchange.github.io/disruptor/),一个小的并发编程框架,通过不同的方法对多线程进行处理来实现更好的性能。不是在调用线程中进行处理,而是将任务交给两组线程,每个线程负责处理的一部分。第一组线程将执行命令处理程序,改变聚合状态。第二组将存储并将事件发布到事件存储。
虽然DisruptorCommandBus的速度轻易优于SimpleCommandBus的4倍(!),但有一些限制:
>DisruptorCommandBus仅支持事件溯源聚合。这个命令总线充当由Disruptor处理的聚合的存储库。使用createRepository(AggregateFactory)可以获取一个存储库的引用,。
>一个命令只能导致一个聚合实例状态变化。
>当使用缓存时,它只允许给定的标识符只能为一个唯一的聚合。这意味着你不能使两个聚合有相同的标识符。
>命令一般不会引发需要回滚工作单元的故障。当回滚发生时,DisruptorCommandBus不能保证命令是按照被分发的顺序处理的。此外,它还需要对其他一些命令进行重试,从而导致不必要的计算。
>在创建一个新的聚合实例时,命令更新所创建实例可能并不完全按照所提供的顺序进行。一旦创建了聚合,所有命令将按照它们被分发顺序执行。为了确保顺序,在创建命令上使用回调去等待正在创建的聚合。它不应该耗时超过几毫秒。
要构造一个DisruptorCommandBus实例,您需要一个EventStore。EventStore组件在Repositories and Event Stores(也就是官方文档十一)这一节里讲。
您还可以提供一个DisruptorConfiguration 实例,它允许您调整配置以优化特定环境的性能:
Buffer size:在ringBuffer上注册传入命令的槽数。更高的值可能会增加吞吐量,但也导致更高的延迟。必须是2的次方数,默认为4096。
ProducerType: 表示条目是由单线程或多线程生成的。默认为多线程。
WaitStrategy:当处理器线程(三个线程负责的实际处理)需要等待对方时使用的策略。最好的WaitStrategy取决于机器上可用的处理器数量,和正在运行的其他进程的数量。如果低延迟是至关重要的,DisruptorCommandBus可以自己认领内核,你可以使用BusySpinWaitStrategy。为了使命令总线索取更少的CPU并且允许其他线程处理,使用YieldingWaitStrategy。最后,你可以使用SleepingWaitStrategy和BlockingWaitStrategy允许其他进程共享CPU。如果命令总线不需要进行专职处理,则后者是合适的。默认为BlockingWaitStrategy。
Executor:设置Executor为DisruptorCommandBus提供线程。这个Executor必须能够提供至少4个线程。其中的3个线程,由DisruptorCommandBus的处理组件认领。额外的线程用于调用回调函数,并计划重试以防检测到错误的聚合状态。默认是CachedThreadPool提供线程从一个称为“DisruptorCommandBus”的线程组中。
TransactionManager:定义了事务管理器,应该确保存储和事件发布以事务的方式执行。
InvokerInterceptors:定义了在调用处理中使用的CommandHandlerInterceptors。这个处理调用实际的命令处理器方法。
PublisherInterceptors:定义了在发布处理中使用的CommandHandlerInterceptors。这是存储和发布生成的事件的过程。
RollbackConfiguration:定义工作单元应该回滚的异常。默认配置为回滚未经检查的异常。
RescheduleCommandsOnCorruptState:指示已经执行过命令但损坏的聚合(如:因为一个工作单元是回滚)是否应该重新计划。如果为假,回调的onFailure()方法将被调用。如果为的(默认),命令将被重新计划。
CoolingDownPeriod:设置等待的秒数,以确保所有命令被处理。在冷却期间,不接受新命令,但是现有的命令仍然处理,并在必要时重新计划。冷却期间确保线程可供重新安排命令和调用回调之用。默认为1000(1秒)。
缓存:设置缓存存储从Event Store中恢复的聚合实例。缓存用disruptor存储不活跃的聚合实例。
InvokerThreadCount:给命令处理器的调用分配线程的数量。一个好的起始点是机器内核数量的一半。
PublisherThreadCount:用于发布事件的线程数量。一个好的起始点是一半的内核数量,如果IO上花费大量的时间,可以增加。
SerializerThreadCount:使用pre-serialize事件的线程数量。默认为1,但如果没有配置序列化器将被忽略。
Serializer:用于执行pre-serialization的序列化器。当配置序列化器时,DisruptorCommandBus将包装所有生成的事件在一个SerializationAware消息上。附加有效负载和元数据的序列化形式,在发布到事件存储之前。

9.3 Command Interceptors(命令拦截器)

使用命令总线的优点之一是能够基于所有传入命令进行操作。比如日志记录或身份验证,你可以不需要考虑命令的类型而做到。因为这是可以使用拦截器完成的。
有不同类型的拦截器:分发拦截器和处理器拦截器。前者是在向命令处理程序发送命令之前调用的。在这一点上,它甚至不确定是否有任何处理对应于该命令。后者在调用命令处理程序之前调用。

9.3.1 Message Dispatch Interceptors

当命令在命令总线上分发时,将会调用消息分发拦截器。它们有能力改变命令消息,例如通过添加元数据,或者通过抛出异常来阻塞命令。这些拦截器总是在分发命令的线程上调用。

9.3.1.1 Structural validation(结构验证)

如果命令不以正确的格式包含所有必需的信息,那么处理命令是没有意义的。事实上,一个缺乏信息的命令应该尽早被阻塞,最好是在任何事务启动之前。因此,拦截器应该检查所有传入命令,以获取这些信息。这叫做结构验证。
Axon Framework支持基于JSR 303 Bean Validation的验证。这允许您使用诸如 @NotEmpty和@Pattern这样的注解对字段进行注解。您需要在类路径中包含JSR 303实现(比如Hibernate-Validator)。然后,配置一个BeanValidationInterceptor命令总线,它将自动查找并配置您的验证器实现。虽然它使用了合理的默认值,但您可以对它进行微调以满足您的特定需求。
Tip:
您希望尽可能少地花费在无效的命令上。因此,这个拦截器通常放置在拦截链的最前面。在某些情况下,一个日志或审计拦截器可能需要放在前面,紧跟着它的是验证拦截器。

BeanValidationInterceptor还实现了MessageHandlerInterceptor,也允许你把它配置为一个处理程序(Handler)拦截器。

9.3.2 Message Handler Interceptors

消息处理器拦截器可以在命令处理之前和之后采取行动。拦截器甚至可以完全阻止命令处理,例如出于安全原因。
拦截器必须实现MessageHandlerInterceptor接口。该接口声明了一个handle方法,该方法包含三个参数:命令消息、当前UnitOfWork 和InterceptorChain。InterceptorChain用于继续分发处理。
与分发拦截器不同,处理程序拦截器在命令处理程序的上下文中调用。这意味着它们可以根据正在处理消息的工作单元附上相关数据。然后将这些关联数据附加到在该工作单元上下文中创建的消息中。
处理程序拦截器通常用于管理处理命令的事务。为此,注册一个TransactionManagingInterceptor,使用TransactionManager依次配置启动和提交(或回滚)实际事务。

9.4 Distributing the Command Bus

前面描述的CommandBus实现只允许在单个JVM中发送命令消息。有时,您需要把在不同的jvm上使用的多个命令总线实例作为一个实例来使用。当返回任何结果时,在一个JVM命令总线上发出的命令应该无缝地传到到另一个JVM中的命令处理程序。
这就是DistributedCommandBus的来源。与其他CommandBus实现不同,DistributedCommandBus完全不调用任何处理程序。它所做的只是在不同JVM的命令总线实现之间形成一个“bridge(桥)”。每个JVM上的分布式commandbus的每个实例都称为“Segment(段)”。

Axon Framework官方文档(九)

Tip:
虽然分布式命令总线本身是Axon Framework核心模块的一部分,但它需要的组件,你可以在其中一个以axon-distributed-commandbus-...开头的模块中找到。如果你使用Maven,确保你有适当的依赖集。groupId和version与核心模块相同。

DistributedCommandBus依赖于两个组件:一个CommandBusConnector,它实现了JVM和CommandRouter之间的通信协议,后者为每个传入命令选择一个目的地。该路由器基于路由策略计算的路由键,定义了将命令路由到分布式命令总线哪一段。两个具有相同路由键的命令将始终被路由到相同的区段,只要区段的数目和配置没有改变。通常,目标聚合的标识符用作路由键。
提供的两个RoutingStrategy实现:MetaDataRoutingStrategy,它在命令消息中使用元数据属性来查找路由键,而AnnotationRoutingStrategy,它使用注解在命令消息有效负载上的@TargetAggregateIdentifier来提取路由键。显然,你也可以提供自己的实现。
默认情况下,当命令消息没有键能被解析时,RoutingStrategy实现将抛出一个异常时。这是可以改变的,即通过在MetaDataRoutingStrategy或AnnotationRoutingStrategy的构造函数中提供一个UnresolvedRoutingKeyPolicy。有三个可能的策略:
>ERROR:这是默认值,当没有可用的路由键时,会引发异常
>RANDOM_KEY:当路由的键无法从命令消息中解析时,将返回一个随机值。这实际上意味着这些命令将被路由到命令总线的随机段上。
>STATIC_KEY:将返回一个静态键(被“未解决”)用于未能解析的路由键。这实际上意味着所有这些命令将被路由到相同的区段,只要区段的配置不变。

9.4.1 JGroupsConnector

JGroupsConnector使用(看名字也看出来了)JGroups作为底层发现和调度机制。描述JGroups的特性集对于这个参考指南来说有点太大了,所以请参考JGroups用户指南来了解更多细节。
由于JGroups处理节点的发现和它们之间的通信,所以JGroupsConnector既充当CommandBusConnector也充当CommandRouter。
Tip:
你可以在axon-distributed-commandbus-jgroups模块中,为DistributedCommandBus找到JGroups特定组件。

JGroupsConnector有四个强制配置元素:
>第一个是JChannel,它定义了JGroups协议栈。一般来说,一个JChannel是由一个对JGroups配置文件的引用构造的。JGroups附带了一些默认配置,这些配置可以作为您自己配置的基础。请记住,IP组播一般不适用于云服务,如Amazon。在这样的环境中,TCP流言通常是一个良好的开端。
>集群名定义每个段应该注册到的集群的名称。具有相同集群名称的段最终将相互检测并相互发送命令。
>本地段”是也是一个命令总线实现,分发命令去往本地的JVM。这些命令可能已经被其他jvm的实例或来自本地的实例发送了。
>最后,在将命令消息发送到网络之前,序列化器用于序列化命令消息。
Tip:
当使用缓存时,当ConsistentHash更改以避免潜在的数据损坏时,它应该被清空(例如,当命令没有指定一个@TargetAggregateVersion和新成员快速加入和离开JGroup,修改聚合然而它还要缓存到其他地方)。

最终,JGroupsConnector需要实际连接,以便向其他段发送消息。为此,调用connect()方法。
JChannel channel = new JChannel("path/to/channel/config.xml");
CommandBus localSegment = new SimpleCommandBus();
Serializer serializer = new XStreamSerializer();

JGroupsConnector connector = new JGroupsConnector(channel, "myCommandBus", localSegment, serializer);
DistributedCommandBus commandBus = new DistributedCommandBus(connector, connector);

// on one node:
commandBus.subscribe(CommandType.class.getName(), handler);
connector.connect();

// on another node, with more CPU:
commandBus.subscribe(CommandType.class.getName(), handler);
commandBus.subscribe(AnotherCommandType.class.getName(), handler2);
commandBus.updateLoadFactor(150); // defaults to 100
connector.connect();

// from now on, just deal with commandBus as if it is local...
Note:
注意,并非所有的segment都必需具有相同类型的命令的命令处理器。你完全可以为不同的命令类型使用不同的segment。分布式命令总线总是选择一个节点分发命令到那个支持特定类型的命令。

9.5 Spring Cloud Connector

Spring Cloud连接器装置,使用Spring Cloud描述的服务注册和发现机制来分发命令总线。因此,您可以*选择用于发布命令的Spring云实现。一个示例实现是Eureka发现/ Eureka服务器组合。
Note:
当前版本(Axon 3.0.4)SpringCloudCommandRouter使用ServiceInstance。Metadata 字段来通知所有系统中的节点,通过CommandNameFilter它可以处理命令。这是很重要的,Spring Cloud实现选择支持ServiceInstance.Metadata字段的使用。例如Spring Cloud Consul目前不支持该字段,因此SpringCloudCommandRouter并不是一个可行的解决方案。我们正在研究一个额外的解决方案,从中检索CommandNameFilter 。

提供每个SpringCloud实现的描述将推动本参考指南。因此,我们参考他们各自的文件以获得进一步的信息。

 Spring Cloud连接器装置是结合SpringCloudCommandRouter和SpringHttpCommandBusConnector。分别填补了DistributedCommandBus的CommandRouter 和CommandBusConnector 的位置。
 Note:
 Spring Cloud连接器特定的组件DistributedCommandBus可以在axon-distributed-commandbus-springcloud模块中找到 。

SpringCloudCommandRouter必须通过提供以下内容来创建:
>一个DiscoveryClient类型“discovery client”。这可以通过用@EnableDiscoveryClient注解你的Spring Boot应用程序来提供,将在你的类路径中寻找Spring Cloud 的实现。
>一个RoutingStrategy类型的"routing strategy"。目前axon-core模块提供了一些实现,但是一函数调用也可以满足要求。例如,如果你想路由命令基于“聚合标识符”,你可以使用AnnotationRoutingStrategy和注解有效载荷的字段,用@TargetAggregateIdentifier识别聚合。

SpringHttpCommandBusConnector需要创建三个参数:

>一个CommandBus类型的“local command bus”。这是命令总线实现,它将分发命令到本地的JVM。这些命令可能是由其他JVM上的或本地的实例分发。
>RestOperations对象来执行一个命令消息的发布到另一个实例。
>最后一个Serializer类型的“序列serializer”。序列化器用于在命令发送到网络之前序列化命令消息。
// Simple Spring Boot App providing the `DiscoveryClient` bean
@EnableDiscoveryClient
@SpringBootApplication
public class MyApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }

    // Example function providing a Spring Cloud Connector
    @Bean
    public CommandRouter springCloudCommandRouter(DiscoveryClient discoveryClient) {
        return new SpringCloudCommandRouter(discoveryClient, new AnnotationRoutingStrategy());
    }

    @Bean
    public CommandBusConnector springHttpCommandBusConnector(@Qualifier("localSegment") CommandBus localSegment,
                                                             RestOperations restOperations,
                                                             Serializer serializer) {
        return new SpringHttpCommandBusConnector(localSegment, restOperations, serializer);
    }

    @Primary // to make sure this CommandBus implementation is used for autowiring
    @Bean
    public DistributedCommandBus springCloudDistributedCommandBus(CommandRouter commandRouter, 
                                                                  CommandBusConnector commandBusConnector) {
        return new DistributedCommandBus(commandRouter, commandBusConnector);
    }

}
// if you don't use Spring Boot Autoconfiguration, you will need to explicitly define the local segment:
@Bean
@Qualifier("localSegment")
public CommandBus localSegment() {
    return new SimpleCommandBus();
}
 Note:
 注意,不需要所有的段都有相同类型命令的命令处理程序。对于不同的命令类型,可以使用不同的段。分布式命令总线总是选择一个节点分发命令到那个支持特定类型的命令。