欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringCloud之Spring Cloud Stream:消息驱动

程序员文章站 2023-10-28 22:55:04
Spring Cloud Stream 是一个构建消息驱动微服务的框架,该框架在Spring Boot的基础上整合了Spring Integrationg来连接消息代理中间件(RabbitMQ, Kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。 应用程... ......

spring cloud stream 是一个构建消息驱动微服务的框架,该框架在spring boot的基础上整合了spring integrationg来连接消息代理中间件(rabbitmq, kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
应用程序通过input通道或者output通道来与spring cloud stream中binder(绑定器)交互,通过配置来binding. 而spring cloud stream的binder负责与中间件交互。

开发工具:intellij idea 2019.2.3

一、服务器端

1、创建项目

idea中创建一个新的springboot项目,名称为“spring-server”,springboot版本选择2.1.10,在选择dependencies(依赖)的界面勾选spring cloud discovery -> eureka server。
pom.xml完整内容如下:

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.1.10.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <groupid>com.example</groupid>
    <artifactid>spring-server</artifactid>
    <version>0.0.1-snapshot</version>
    <name>spring-server</name>
    <description>demo project for spring boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>greenwich.sr4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-netflix-eureka-server</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>

2、修改配置application.yml

修改端口号为8761;取消将自己信息注册到eureka服务器,不从eureka服务器抓取注册信息。

server:
  port: 8761
eureka:
  client:
    register-with-eureka: false
    fetch-registry: false

3、修改启动类代码

增加注解@enableeurekaserver

package com.example.springserver;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.netflix.eureka.server.enableeurekaserver;

@springbootapplication
@enableeurekaserver
public class springserverapplication {

    public static void main(string[] args) {
        springapplication.run(springserverapplication.class, args);
    }

}

二、消息生产者

1、创建项目
idea中创建一个新的springboot项目,名称为“spring-producer”,springboot版本选择2.1.10,在选择dependencies(依赖)的界面勾选web -> spring web,spring cloud discovery -> eureka discovery client。
打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit,会自动引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整内容如下:

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.1.10.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <groupid>com.example</groupid>
    <artifactid>spring-producer</artifactid>
    <version>0.0.1-snapshot</version>
    <name>spring-producer</name>
    <description>demo project for spring boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>greenwich.sr4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-netflix-eureka-client</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-rabbit</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-stream-test-support</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>

2、修改配置application.yml

pom.xml使用rabbitmq,默认情况下,连接本地的5672端口。下面这段rabbitmq也可省略。

server:
  port: 8081
spring:
  application:
    name: spring-producer
eureka:
  instance:
    hostname: localhost
  client:
    serviceurl:
      defaultzone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

3、编写发送服务

方法sendorder使用@output("myinput")注解表示创建myinput的消息通道。调用该方法后,会向myinput通道投递消息。
如果不使用参数myinput,则使用方法名作为通道名称。

package com.example.springproducer;

import org.springframework.cloud.stream.annotation.output;
import org.springframework.messaging.subscribablechannel;

public interface sendservice {
    @output("myinput")
    subscribablechannel sendorder();
}

4、修改启动类代码

加入注解@enablebinding以开启spring容器的绑定功能,以sendservice.class为参数,spring容器启动时,会自动绑定sendservice接口中定义的通道。

package com.example.springproducer;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.netflix.eureka.enableeurekaclient;
import org.springframework.cloud.stream.annotation.enablebinding;

@springbootapplication
@enableeurekaclient
@enablebinding(sendservice.class)
public class springproducerapplication {

    public static void main(string[] args) {
        springapplication.run(springproducerapplication.class, args);
    }

}

5、添加一个控制器类

调用sendservice的发送方法,往服务器发送消息。

package com.example.springproducer;

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.messaging.message;
import org.springframework.messaging.support.messagebuilder;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.requestmethod;
import org.springframework.web.bind.annotation.restcontroller;

@restcontroller
public class producercontroller {

    @autowired
    sendservice sendservice;

    @requestmapping(value="/send",method= requestmethod.get)
    public string sendrequest(){
        //创建消息
        message msg = messagebuilder.withpayload("hello world".getbytes()).build();
        //发送消息
        sendservice.sendorder().send(msg);
        return "success";
    }
}

三、消息消费者

1、创建项目

idea中创建一个新的springboot项目,名称为“spring-consumer”,springboot版本选择2.1.10,在选择dependencies(依赖)的界面勾选web -> spring web,spring cloud discovery -> eureka discovery client。
打开pom.xml,添加依赖

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>
    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.1.10.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>
    <groupid>com.example</groupid>
    <artifactid>spring-consumer</artifactid>
    <version>0.0.1-snapshot</version>
    <name>spring-consumer</name>
    <description>demo project for spring boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>greenwich.sr4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-netflix-eureka-client</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.cloud</groupid>
            <artifactid>spring-cloud-starter-stream-rabbit</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencymanagement>
        <dependencies>
            <dependency>
                <groupid>org.springframework.cloud</groupid>
                <artifactid>spring-cloud-dependencies</artifactid>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencymanagement>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>

</project>

2、修改配置application.yml

server:
  port: 8080
spring:
  application:
    name: spring-consumer
eureka:
  instance:
    hostname: localhost
  client:
    serviceurl:
      defaultzone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

3、缩写接受消息的通道接口

package com.example.springconsumer;

import org.springframework.cloud.stream.annotation.input;
import org.springframework.messaging.subscribablechannel;

public interface receiveservice {
    @input("myinput")
    subscribablechannel myinput();
}

4、修改启动类代码

同样绑定消息通道

package com.example.springconsumer;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.cloud.stream.annotation.enablebinding;
import org.springframework.cloud.stream.annotation.streamlistener;

@springbootapplication
@enablebinding(receiveservice.class)
public class springconsumerapplication {

    public static void main(string[] args) {
        springapplication.run(springconsumerapplication.class, args);
    }

    //订阅myinput通道的消息
    @streamlistener("myinput")
    public void receive(byte[] msg){
        system.out.println("接收到的消息:" + new string(msg));
    }
}

5、测试

(1)检查服务里面的rabbitmq是否有启动(默认启动);

(2)启动spring-server(8761端口);

(3)启动spring-producer(8081端口);

(4)启动spring-consumer(8080端口);

(5)浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:

接收到的消息:hello world

说明消费者已经可以从消息代理中获取到消息。

四、更换绑定器

上面使用了rabbitmq作为消息代理,如果使用kafka,可以更换maven依赖实现。
在生产者和消费者的pom.xml中,将spring-cloud-starter-stream-rabbit修改为spring-cloud-starter-stream-kafka。