欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【记录】springcloud微服务搭建(三)消息中间件springCloudStream

程序员文章站 2022-05-18 08:01:58
...

一 概要

微服务的消息中间件,是基于现成队列工具的组件。比起手动使用队列,中间件有官方提供的注解、官方提供的消息驱动架构。消息总线也是基于中间件的。用途就是利用队列处理数据,可以解耦、削峰。

二 搭建

这里使用rabbitmq

1 启动rabbitmq(我使用5672为监听端口)

2 涉及的微服务增加依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

增加配置

#队列配置,生产者消费者都要
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#生产者不配接受,否则报错。消费者需要配接受,发送随意
#spring.cloud.stream.bindings.cus_single_mes_input.destination=channel_1
#spring.cloud.stream.bindings.cus_single_mes_input.binder=rabbit_1
#spring.cloud.stream.bindings.cus_single_mes_input.group=group_1
spring.cloud.stream.bindings.cus_single_mes_out.destination=channel_1
spring.cloud.stream.bindings.cus_single_mes_out.binder=rabbit_1
spring.cloud.stream.bindings.cus_single_mes_out.group=group_1
#绑定的组件为rabbitmq
spring.cloud.stream.binders.rabbit_1.type=rabbit

其中rabbit_1为自定义的别名,必须保持bindings和binder使用的一致;

destination我理解为频道,区分不同用途的队列;

binder为绑定的别名;

group若有多个消费者一致,则会轮流取数,互相不重复;若不一致,则分别都把所有数据取出来。

3 生产者消费者建立通道接口,入口增加注解,注册接口

package com.cus.cus.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

/**
 * 消息通道接口
 */
@Component
public interface CusMes {

    String inputName = "cus_single_mes_input";
    String outputName = "cus_single_mes_out";

    @Input(inputName)
    SubscribableChannel cusInput();

    @Output(outputName)
    MessageChannel cusOutput();

}



@EnableBinding(value={CusMes.class})

其中名字与配置保持一致

4 生产者编写发送方法

    @Autowired
    CusMes c;

    public void send1(String m){
        Message<String> s = MessageBuilder.withPayload(m).build();
        System.out.println("c1 out : "+m);
        c.cusOutput().send(s);
    }

5 消费者编写接受方法

package com.product.product.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Component
public class GetMes {

    /*接受inputname的消息,通过outputname发送消息到下一个接受者*/
    @StreamListener(CusMes.inputName)
//    @SendTo(CusMes.outputName)
    public void getMes(String m){
        System.out.println("p1 get mes :"+m);
    }
}

其中@StreamListener注解表示此方法监听的队列,@SendTo表示此方法处理完后转发给下一个接受者。

若只在本方法处理,不再发送,则不需要sendto,且不能有返回值;若有sendto,则需要有返回值。

相关标签: 工作日记