欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringCloud进阶:Spring Cloud Stream核心组件

程序员文章站 2022-06-13 10:27:33
...

 我的博客:程序员笑笑生,欢迎浏览博客!
 回复 “Spring Cloud”、“Spring Boot” 获取 全套 ****!时间有限!可扫描文章下方二维码

 上一章 SpringCloud 进阶:-消息驱动Spring Cloud Stream当中,我们初始的了解Spring Cloud Stream和如何简单的构建消息驱动的服务的。本章我们将了解一下Spring Cloud Stream的组件。

前言

 上文我们通过一个简单的实例,利用Spring Cloud Stream实现了一个发布-订阅的模型。从中我们很容易理解,Spring Cloud Stream有三个角色,分别是,消息发布者,消费者和消息通讯系统。

主要是以消息通讯系统为中心,如下图:

SpringCloud进阶:Spring Cloud Stream核心组件

一 、 Spring Cloud Stream核心组件

1、1 Binder

​ Binder 是Spring Cloud Steram的一个重要的抽象,目前Spring Cloud Stream实现了面向Kafka和RabbitMQ的Binder。有了Binder有很方便的连接中间件了。Binder提供了消费者分组和消息分区的特性。

1、2 Channel

 即通道,是队列Queue的一种抽象,在具体的消息通讯系统中,队列作用就是实现存储和转发的媒介,我们通过Channel对队列进行配置。

1、3 Source和Sink

 我们不是第一次看到Source和Sink了,简单的可理解为输入和输出。注意:这里的参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

 在Spring Cloud Stream中,Source组件是使用一个POJO对象发布消息的,该对象需要序列化然后发布到Channel中,Sink反序列化POJO对象。在底层的处理机制上,需要借助Spring Integration这个企业服务总线的组件。

二 Spring Integration简介

 Spring Integration的定位是一种企业服务总线 ESB(Enterprise Service Bus),在Spring Integration中,通道被抽象成两种表现形式:PollableChannel和SubscribableChannel,都是继承了MessageChannel:

MessageChannel

package org.springframework.messaging;

public interface MessageChannel {

	long INDEFINITE_TIMEOUT = -1;

	default boolean send(Message<?> message) {
		return send(message, INDEFINITE_TIMEOUT);
	}

	boolean send(Message<?> message, long timeout);

}

PollableChannel:通过轮询操作主动获取消息,receive方法

public interface PollableChannel extends MessageChannel {
	
	Message<?> receive();

	Message<?> receive(long timeout);
	
	}

SubscribableChannel 发布/订阅模式 ,通过回调函数MessageHandler实现事件响应:

public interface SubscribableChannel extends MessageChannel {

	boolean subscribe(MessageHandler handler);

	boolean unsubscribe(MessageHandler handler);

}

三 、Spring Cloud Stream和Spring Integration

 结合了Integration,我们就更容易理解Source和Sink了,Source和Sink都是接口,Source定义如下,通过MessageChannel发送消息,@Output定义了一个输出通道,消息通过该通道离开应用:

public interface Source {

	String OUTPUT = "output";

	@Output(Source.OUTPUT)
	MessageChannel output();

}

 同样Sink接口,通过SubscribableChannel实现消息的接受,@Input注解定义了一个输入通道,接受来自外部的消息:

public interface Sink {
	String INPUT = "input";
	@Input(Sink.INPUT)
	SubscribableChannel input();

}

@Input和@Output可以使用通道名称作为参数,没有名称,默认是方法名称,也就是"input"、“output”,一个SpringCloudStream应用程序,可以存在任意数量的Input和Output通道。

总结

 本章着重的介绍了Spring Cloud Stream的核心组件,以及Spring Integration的简介,这样有利于更好的理解Spring Cloud Stream。

以就是本期的分享,你还可以关注公众号: 程序员笑笑生,关注更多精彩内容!
SpringCloud进阶:Spring Cloud Stream核心组件

SpringCloud进阶:Spring Cloud Stream核心组件

SpringCloud基础教程(一)-微服务与SpringCloud

SpringCloud基础教程(二)-服务发现 Eureka

SpringCloud基础教程(三)-Eureka进阶

SpringCloud 基础教程(四)-配置中心入门

SpringCloud基础教程(五)-配置中心热生效和高可用

SpringCloud 基础教程(六)-负载均衡Ribbon

SpringCloud 基础教程(七)-Feign声明式服务调用

SpringCloud 基础教程(八)-Hystrix熔断器(上)

SpringCloud 基础教程(九)-Hystrix服务监控(下)

SpringCloud 基础教程(十)-Zull服务网关

SpringCloud 基础教程(十一)- Sleuth 调用链追踪简介

SpringCloud 基础教程(十二)-Zipkin 分布式链路追踪系统搭建

SpringCloud 进阶: 消息驱动(入门) Spring Cloud Stream【Greenwich.SR3】

更多精彩内容,请期待…

本文由博客一文多发平台 OpenWrite 发布!