【记录】springcloud微服务搭建(三)消息中间件springCloudStream
程序员文章站
2022-05-18 08:01:58
...
一 概要
微服务的消息中间件,是基于现成队列工具的组件。比起手动使用队列,中间件有官方提供的注解、官方提供的消息驱动架构。消息总线也是基于中间件的。用途就是利用队列处理数据,可以解耦、削峰。
二 搭建
这里使用rabbitmq
1 启动rabbitmq(我使用5672为监听端口)
2 涉及的微服务增加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
增加配置
#队列配置,生产者消费者都要
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#生产者不配接受,否则报错。消费者需要配接受,发送随意
#spring.cloud.stream.bindings.cus_single_mes_input.destination=channel_1
#spring.cloud.stream.bindings.cus_single_mes_input.binder=rabbit_1
#spring.cloud.stream.bindings.cus_single_mes_input.group=group_1
spring.cloud.stream.bindings.cus_single_mes_out.destination=channel_1
spring.cloud.stream.bindings.cus_single_mes_out.binder=rabbit_1
spring.cloud.stream.bindings.cus_single_mes_out.group=group_1
#绑定的组件为rabbitmq
spring.cloud.stream.binders.rabbit_1.type=rabbit
其中rabbit_1为自定义的别名,必须保持bindings和binder使用的一致;
destination我理解为频道,区分不同用途的队列;
binder为绑定的别名;
group若有多个消费者一致,则会轮流取数,互相不重复;若不一致,则分别都把所有数据取出来。
3 生产者消费者建立通道接口,入口增加注解,注册接口
package com.cus.cus.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
/**
* 消息通道接口
*/
@Component
public interface CusMes {
String inputName = "cus_single_mes_input";
String outputName = "cus_single_mes_out";
@Input(inputName)
SubscribableChannel cusInput();
@Output(outputName)
MessageChannel cusOutput();
}
@EnableBinding(value={CusMes.class})
其中名字与配置保持一致
4 生产者编写发送方法
@Autowired
CusMes c;
public void send1(String m){
Message<String> s = MessageBuilder.withPayload(m).build();
System.out.println("c1 out : "+m);
c.cusOutput().send(s);
}
5 消费者编写接受方法
package com.product.product.stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Component
public class GetMes {
/*接受inputname的消息,通过outputname发送消息到下一个接受者*/
@StreamListener(CusMes.inputName)
// @SendTo(CusMes.outputName)
public void getMes(String m){
System.out.println("p1 get mes :"+m);
}
}
其中@StreamListener注解表示此方法监听的队列,@SendTo表示此方法处理完后转发给下一个接受者。
若只在本方法处理,不再发送,则不需要sendto,且不能有返回值;若有sendto,则需要有返回值。