欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

细聊Spring Cloud Bus

程序员文章站 2023-11-13 11:57:40
细聊Spring Cloud Bus Spring 事件驱动模型 因为Spring Cloud Bus的运行机制也是Spring事件驱动模型所以需要先了解相关知识点: 上面图中是Spring事件驱动模型的实现示意图,以下再补充一些图中未提现的实现细节:抽象类abstract class Abstra ......

细聊spring cloud bus

spring 事件驱动模型

因为spring cloud bus的运行机制也是spring事件驱动模型所以需要先了解相关知识点:
细聊Spring Cloud Bus
上面图中是spring事件驱动模型的实现示意图,以下再补充一些图中未提现的实现细节:抽象类abstract class abstractapplicationeventmulticaster中根据事件和事件类型获取对应的观察者的方法是:

    protected collection<applicationlistener<?>> getapplicationlisteners(
            applicationevent event, resolvabletype eventtype)  

该方法内具体检索监听器(观察者的方法)是:

private collection<applicationlistener<?>> retrieveapplicationlisteners(
            resolvabletype eventtype, @nullable class<?> sourcetype, @nullable listenerretriever retriever)
            
            .....
        // add programmatically registered listeners, including ones coming
        // from applicationlistenerdetector (singleton beans and inner beans).
        for (applicationlistener<?> listener : listeners) {
            if (supportsevent(listener, eventtype, sourcetype)) {
                if (retriever != null) {
                    retriever.applicationlisteners.add(listener);
                }
                alllisteners.add(listener);
            }
        }
            .....

此方法内根据传入参数事的件对象遍历所有对应(订阅)的监听者,其中有个很重要的方法boolean supportsevent,此方法用于判断是否是订阅的监听者:

    protected boolean supportsevent(
            applicationlistener<?> listener, resolvabletype eventtype, @nullable class<?> sourcetype) {

        genericapplicationlistener smartlistener = (listener instanceof genericapplicationlistener ?
                (genericapplicationlistener) listener : new genericapplicationlisteneradapter(listener));
        return (smartlistener.supportseventtype(eventtype) && smartlistener.supportssourcetype(sourcetype));
    }

其中接口genericapplicationlistener和genericapplicationlisteneradapter类都是为了定义或实现supportseventtype方法和supportssourcetype方法,通过这两个方法确定是否是事件的监听器(观察者、订阅者)。

public interface genericapplicationlistener extends applicationlistener<applicationevent>, ordered {

    /**
     * determine whether this listener actually supports the given event type.
     * @param eventtype the event type (never {@code null})
     */
    boolean supportseventtype(resolvabletype eventtype);

    /**
     * determine whether this listener actually supports the given source type.
     * <p>the default implementation always returns {@code true}.
     * @param sourcetype the source type, or {@code null} if no source
     */
    default boolean supportssourcetype(@nullable class<?> sourcetype) {
        return true;
    }

    /**
     * determine this listener's order in a set of listeners for the same event.
     * <p>the default implementation returns {@link #lowest_precedence}.
     */
    @override
    default int getorder() {
        return lowest_precedence;
    }

}

其中判断发布事件的来源对象supportssourcetype方法默认就返回true,意味着如果不重写这个接口方法,是否是订阅事件的监听器不以事件来源对象进行判断,只根据事件类型进行筛选,该方法的具体实现可参考genericapplicationlisteneradapter类包装的supportssourcetype方法实现:

public boolean supportssourcetype(@nullable class<?> sourcetype) {
        return !(this.delegate instanceof smartapplicationlistener) ||
                ((smartapplicationlistener) this.delegate).supportssourcetype(sourcetype);
    }

spring cloud bus的事件、发布、订阅

spring cloud bus的事件都继承于remoteapplicationevent类,remoteapplicationevent类继承于spring事件驱动模型的事件抽象类applicationevent,也就说spring cloud bus的事件、发布、订阅也是基于spring的事件驱动模型,例如spring cloud bus的配置刷新事件refreshremoteapplicationevent:

细聊Spring Cloud Bus

同理订阅事件也是标准的spring事件驱动模型,例如配置刷新的监听器源码继承了spring事件驱动模型中的接口applicationlistener<e extends applicationevent>:

public class refreshlistener
        implements applicationlistener<refreshremoteapplicationevent> {

    private static log log = logfactory.getlog(refreshlistener.class);

    private contextrefresher contextrefresher;

    public refreshlistener(contextrefresher contextrefresher) {
        this.contextrefresher = contextrefresher;
    }

    @override
    public void onapplicationevent(refreshremoteapplicationevent event) {
        set<string> keys = this.contextrefresher.refresh();
        log.info("received remote refresh request. keys refreshed " + keys);
    }

}

在busrefreshautoconfiguration类中会将refreshlistener对象注册到spring的beanfactory中(不把监听器类注册到spring的beanfactory中就无法利用spring的事件驱动模型对刷新事件进行处理)。

    @bean
    @conditionalonproperty(value = "spring.cloud.bus.refresh.enabled",matchifmissing = true)
    @conditionalonbean(contextrefresher.class)
    public refreshlistener refreshlistener(contextrefresher contextrefresher) {
        return new refreshlistener(contextrefresher);
    }

也可以使用@eventlistener创建监听器,例如tracelistener类:

    @eventlistener
    public void onack(ackremoteapplicationevent event) {
        map<string, object> trace = getreceivedtrace(event);
        // fixme boot 2 this.repository.add(trace);
    }

    @eventlistener
    public void onsend(sentapplicationevent event) {
        map<string, object> trace = getsenttrace(event);
        // fixme boot 2 this.repository.add(trace);
    }

发布事件也是利用应用程序上下文进行事件发布,比如配置刷新的实现代码:

@endpoint(id = "bus-refresh") // todo: document new id
public class refreshbusendpoint extends abstractbusendpoint {

    public refreshbusendpoint(applicationeventpublisher context, string id) {
        super(context, id);
    }

    @writeoperation
    public void busrefreshwithdestination(@selector string destination) { // todo:
                                                                            // document
                                                                            // destination
        publish(new refreshremoteapplicationevent(this, getinstanceid(), destination));
    }

    @writeoperation
    public void busrefresh() {
        publish(new refreshremoteapplicationevent(this, getinstanceid(), null));
    }

}

注解@writeoperation实现post操作,@endpoint结合management.endpoints.web.exposure.include=* 配置项可实现一个接入点,接入点的url是:/actuator/bus-refresh
父类abstractbusendpoint内用应用程序上下文实现事件的发布:

public class abstractbusendpoint {

    private applicationeventpublisher context;

    private string appid;

    public abstractbusendpoint(applicationeventpublisher context, string appid) {
        this.context = context;
        this.appid = appid;
    }

    protected string getinstanceid() {
        return this.appid;
    }

    protected void publish(applicationevent event) {
        this.context.publishevent(event);
    }

}

spring cloud bus的底层通讯实现(对使用者透明)

spring cloud bus的底层通讯基础是spring cloud stream,定义发送总线事件和接收总线事件监听器的类是busautoconfiguration(在网络上发送和接收其他节点的事件消息),因为继承了applicationeventpublisheraware所以该类也具备发布本地事件的功能(可以查询aware接口作用),发布网络事件消息的方法是:

@eventlistener(classes = remoteapplicationevent.class)
    public void acceptlocal(remoteapplicationevent event) {
        if (this.servicematcher.isfromself(event)
                && !(event instanceof ackremoteapplicationevent)) {
            this.cloudbusoutboundchannel.send(messagebuilder.withpayload(event).build());
        }
    }

如果监听到remoteapplicationevent类事件,首先检查是否是自己发布并且不是ack事件,如果是自己发布的非ack事件就在总线上发送这个事件消息。发送ackremoteapplicationevent(ack事件)已经在接收其他节点发的事件消息时触发了,所以这里不用管发送ack事件的工作了。

接收事件消息:

@streamlistener(springcloudbusclient.input)
    public void acceptremote(remoteapplicationevent event) {
        if (event instanceof ackremoteapplicationevent) {
            if (this.bus.gettrace().isenabled() && !this.servicematcher.isfromself(event)
                    && this.applicationeventpublisher != null) {
                this.applicationeventpublisher.publishevent(event);
            }
            // if it's an ack we are finished processing at this point
            return;
        }
        if (this.servicematcher.isforself(event)
                && this.applicationeventpublisher != null) {
            if (!this.servicematcher.isfromself(event)) {
                this.applicationeventpublisher.publishevent(event);
            }
            if (this.bus.getack().isenabled()) {
                ackremoteapplicationevent ack = new ackremoteapplicationevent(this,
                        this.servicematcher.getserviceid(),
                        this.bus.getack().getdestinationservice(),
                        event.getdestinationservice(), event.getid(), event.getclass());
                this.cloudbusoutboundchannel
                        .send(messagebuilder.withpayload(ack).build());
                this.applicationeventpublisher.publishevent(ack);
            }
        }
        if (this.bus.gettrace().isenabled() && this.applicationeventpublisher != null) {
            // we are set to register sent events so publish it for local consumption,
            // irrespective of the origin
            this.applicationeventpublisher.publishevent(new sentapplicationevent(this,
                    event.getoriginservice(), event.getdestinationservice(),
                    event.getid(), event.getclass()));
        }
    }

接收到其他节点发来的事件消息后会将此事件发布到本地的应用程序上下文中(this.applicationeventpublisher),监听此事件类型的订阅者就会相应的进行处理。

两个跟踪事件ackremoteapplicationevent和sentapplicationevent

从他们的继承关系可以看出,ackremoteapplicationevent可以发送到其他网络节点(继承于remoteapplicationevent),sentapplicationevent只是本地事件(继承于applicationevent),sentapplicationevent事件可以显示收到事件消息的类型,ackremoteapplicationevent事件只显示收到事件消息的id,tracelistener类负责监听和记录他们的内容(配置项要打开spring.cloud.bus.trace.enabled=true):

public class tracelistener {

    @eventlistener
    public void onack(ackremoteapplicationevent event) {
        map<string, object> trace = getreceivedtrace(event);
        // fixme boot 2 this.repository.add(trace);
    }

    @eventlistener
    public void onsend(sentapplicationevent event) {
        map<string, object> trace = getsenttrace(event);
        // fixme boot 2 this.repository.add(trace);
    }

    protected map<string, object> getsenttrace(sentapplicationevent event) {
        .....
    }

    protected map<string, object> getreceivedtrace(ackremoteapplicationevent event) {
        .....
    }

}

在总线事件发送端和总线事件接收端日志的记录流程如下:
细聊Spring Cloud Bus

测试a应用和b应用进行“聊天”

首先准备环境:
创建3个项目:spring-cloud-bus-shared-library、spring-cloud-bus-a、spring-cloud-bus-b

  • spring-cloud-bus-shared-library:负责定义事件和监听器还有配置类
  • spring-cloud-bus-a:扮演a应用负责引用shared-library并利用bus发送消息给b应用(此消息实际为广播消息)
  • spring-cloud-bus-b:扮演b应用负责引用shared-library并利用bus回复a应用发来的消息(此消息非广播消息)

spring-cloud-bus-shared-library的pom的依赖项:

<properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>greenwich.sr3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-bus-amqp</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-actuator</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>

删除构建的maven插件节点否则构建后其他项目引用不了(格式不对):

<build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

启动一个rabbitmq:

docker pull rabbitmq:3-management

docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

application.properties配置定义:

spring.application.name=spring-cloud-bus-shared-library
server.port=9007
# 开启消息跟踪
spring.cloud.bus.trace.enabled=true
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#显示的暴露接入点
management.endpoints.web.exposure.include=*

spring-cloud-bus-a、spring-cloud-bus-b的配置信息除了spring.application.name和server.port不一样,其他都是一样的。

自定义一个聊天事件类:

/**
 * 聊天事件
 */
public class chatremoteapplicationevent extends remoteapplicationevent {

    private string message;

    //for serializers
    private chatremoteapplicationevent(){}

    public chatremoteapplicationevent(object source, string originservice,
            string destinationservice,string message){
        super(source, originservice, destinationservice);

        this.message = message;
    }

    public void setmessage(string message){
        this.message = message;
    }

    public string getmessage(){
        return this.message;
    }
}

自定义聊天事件监听器:

/**
 * 聊天事件监听
 */
public class chatlistener implements applicationlistener<chatremoteapplicationevent> {

    private static log log = logfactory.getlog(chatlistener.class);

    public chatlistener(){}

    @override
    public void onapplicationevent(chatremoteapplicationevent event){
        log.info(string.format("应用%s对应用%s悄悄的说:\"%s\"",
                event.getoriginservice(),
                event.getdestinationservice(),
                event.getmessage()));
    }
}

配置类将监听器注册到beanfactory中,并需要显示的告诉spring cloud bus我们有一个自定义事件:@remoteapplicationeventscan(basepackageclasses=chatremoteapplicationevent.class),否则bus收到消息后无法识别事件类型。

@configuration
@conditionalonclass(chatlistener.class)
@remoteapplicationeventscan(basepackageclasses=chatremoteapplicationevent.class)
public class buschatconfiguration {

    @bean
    public chatlistener chatlistener(){
        return new chatlistener();
    }
}

发布到本地maven仓库:

mvn install

spring-cloud-bus-a、spring-cloud-bus-b的pom依赖:

<properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>greenwich.sr3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-actuator</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-bus</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupid>com.bluersw</groupid>
            <artifactid>spring-cloud-bus-shared-library</artifactid>
            <version>0.0.1-snapshot</version>
        </dependency>
    </dependencies>

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

在spring-cloud-bus-a、spring-cloud-bus-b的启动main函数上增加@componentscan(value = "com.bluersw")注解,否则不会扫描引用spring-cloud-bus-shared-library项目的配置类(也就加载不了自定义的事件和监听器类型)。

spring-cloud-bus-a:

@springbootapplication
@componentscan(value = "com.bluersw")
public class springcloudbusaapplication {

    public static void main(string[] args) {
        springapplication.run(springcloudbusaapplication.class, args);
    }

}

spring-cloud-bus-b:

@springbootapplication
@componentscan(value = "com.bluersw")
public class springcloudbusbapplication {

    public static void main(string[] args) {
        springapplication.run(springcloudbusbapplication.class, args);
    }

}

spring-cloud-bus-a发送消息给spring-cloud-bus-b(启动spring-cloud-bus-a程序和spring-cloud-bus-b程序):

@runwith(springrunner.class)
@springboottest
public class springcloudbusaapplicationtests {

    @autowired
    private applicationeventpublisher context;

    @autowired
    private busproperties bp;

    @test
    public void achat() {
        context.publishevent(new chatremoteapplicationevent(this,bp.getid(),null,"hi!b应用,我是a应用,。"));
    }

}

执行achat()之后,spring-cloud-bus-b的控制台会输出:
”应用spring-cloud-bus-a