SpringCloud之Spring Cloud Stream:消息驱动
spring cloud stream 是一个构建消息驱动微服务的框架,该框架在spring boot的基础上整合了spring integrationg来连接消息代理中间件(rabbitmq, kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
应用程序通过input通道或者output通道来与spring cloud stream中binder(绑定器)交互,通过配置来binding. 而spring cloud stream的binder负责与中间件交互。
开发工具:intellij idea 2019.2.3
一、服务器端
1、创建项目
idea中创建一个新的springboot项目,名称为“spring-server”,springboot版本选择2.1.10,在选择dependencies(依赖)的界面勾选spring cloud discovery -> eureka server。
pom.xml完整内容如下:
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.1.10.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <groupid>com.example</groupid> <artifactid>spring-server</artifactid> <version>0.0.1-snapshot</version> <name>spring-server</name> <description>demo project for spring boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>greenwich.sr4</spring-cloud.version> </properties> <dependencies> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-netflix-eureka-server</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> <dependencymanagement> <dependencies> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-dependencies</artifactid> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencymanagement> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build> </project>
2、修改配置application.yml
修改端口号为8761;取消将自己信息注册到eureka服务器,不从eureka服务器抓取注册信息。
server: port: 8761 eureka: client: register-with-eureka: false fetch-registry: false
3、修改启动类代码
增加注解@enableeurekaserver
package com.example.springserver; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.cloud.netflix.eureka.server.enableeurekaserver; @springbootapplication @enableeurekaserver public class springserverapplication { public static void main(string[] args) { springapplication.run(springserverapplication.class, args); } }
二、消息生产者
1、创建项目
idea中创建一个新的springboot项目,名称为“spring-producer”,springboot版本选择2.1.10,在选择dependencies(依赖)的界面勾选web -> spring web,spring cloud discovery -> eureka discovery client。
打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit,会自动引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整内容如下:
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.1.10.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <groupid>com.example</groupid> <artifactid>spring-producer</artifactid> <version>0.0.1-snapshot</version> <name>spring-producer</name> <description>demo project for spring boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>greenwich.sr4</spring-cloud.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-netflix-eureka-client</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-stream-rabbit</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-stream-test-support</artifactid> <scope>test</scope> </dependency> </dependencies> <dependencymanagement> <dependencies> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-dependencies</artifactid> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencymanagement> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build> </project>
2、修改配置application.yml
pom.xml使用rabbitmq,默认情况下,连接本地的5672端口。下面这段rabbitmq也可省略。
server: port: 8081 spring: application: name: spring-producer eureka: instance: hostname: localhost client: serviceurl: defaultzone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
3、编写发送服务
方法sendorder使用@output("myinput")注解表示创建myinput的消息通道。调用该方法后,会向myinput通道投递消息。
如果不使用参数myinput,则使用方法名作为通道名称。
package com.example.springproducer; import org.springframework.cloud.stream.annotation.output; import org.springframework.messaging.subscribablechannel; public interface sendservice { @output("myinput") subscribablechannel sendorder(); }
4、修改启动类代码
加入注解@enablebinding以开启spring容器的绑定功能,以sendservice.class为参数,spring容器启动时,会自动绑定sendservice接口中定义的通道。
package com.example.springproducer; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.cloud.netflix.eureka.enableeurekaclient; import org.springframework.cloud.stream.annotation.enablebinding; @springbootapplication @enableeurekaclient @enablebinding(sendservice.class) public class springproducerapplication { public static void main(string[] args) { springapplication.run(springproducerapplication.class, args); } }
5、添加一个控制器类
调用sendservice的发送方法,往服务器发送消息。
package com.example.springproducer; import org.springframework.beans.factory.annotation.autowired; import org.springframework.messaging.message; import org.springframework.messaging.support.messagebuilder; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.requestmethod; import org.springframework.web.bind.annotation.restcontroller; @restcontroller public class producercontroller { @autowired sendservice sendservice; @requestmapping(value="/send",method= requestmethod.get) public string sendrequest(){ //创建消息 message msg = messagebuilder.withpayload("hello world".getbytes()).build(); //发送消息 sendservice.sendorder().send(msg); return "success"; } }
三、消息消费者
1、创建项目
idea中创建一个新的springboot项目,名称为“spring-consumer”,springboot版本选择2.1.10,在选择dependencies(依赖)的界面勾选web -> spring web,spring cloud discovery -> eureka discovery client。
打开pom.xml,添加依赖
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.1.10.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <groupid>com.example</groupid> <artifactid>spring-consumer</artifactid> <version>0.0.1-snapshot</version> <name>spring-consumer</name> <description>demo project for spring boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>greenwich.sr4</spring-cloud.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-netflix-eureka-client</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-stream-rabbit</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> <dependencymanagement> <dependencies> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-dependencies</artifactid> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencymanagement> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build> </project>
2、修改配置application.yml
server: port: 8080 spring: application: name: spring-consumer eureka: instance: hostname: localhost client: serviceurl: defaultzone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
3、缩写接受消息的通道接口
package com.example.springconsumer; import org.springframework.cloud.stream.annotation.input; import org.springframework.messaging.subscribablechannel; public interface receiveservice { @input("myinput") subscribablechannel myinput(); }
4、修改启动类代码
同样绑定消息通道
package com.example.springconsumer; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.cloud.stream.annotation.enablebinding; import org.springframework.cloud.stream.annotation.streamlistener; @springbootapplication @enablebinding(receiveservice.class) public class springconsumerapplication { public static void main(string[] args) { springapplication.run(springconsumerapplication.class, args); } //订阅myinput通道的消息 @streamlistener("myinput") public void receive(byte[] msg){ system.out.println("接收到的消息:" + new string(msg)); } }
5、测试
(1)检查服务里面的rabbitmq是否有启动(默认启动);
(2)启动spring-server(8761端口);
(3)启动spring-producer(8081端口);
(4)启动spring-consumer(8080端口);
(5)浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:
接收到的消息:hello world
说明消费者已经可以从消息代理中获取到消息。
四、更换绑定器
上面使用了rabbitmq作为消息代理,如果使用kafka,可以更换maven依赖实现。
在生产者和消费者的pom.xml中,将spring-cloud-starter-stream-rabbit修改为spring-cloud-starter-stream-kafka。
推荐阅读
-
SpringCloud之消息总线Spring Cloud Bus实例代码
-
SpringCloud之服务注册与发现Spring Cloud Eureka实例代码
-
SpringCloud之Spring Cloud Stream:消息驱动
-
Spring Cloud 系列之 Spring Cloud Stream
-
Spring Cloud Stream微服务消息框架原理及实例解析
-
Spring Cloud Stream + RocketMq实现事务性消息
-
SpringCloud学习笔记【十三】Spring Cloud Bus消息总线
-
spring cloud系列学习(SpringCloud之服务注册之Ribbon负载均衡)
-
跟我学SpringCloud | 第八篇:Spring Cloud Bus 消息总线
-
Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑