6-2消息队列基础概念简单案例集成
程序员文章站
2024-01-05 10:28:04
...
文章目录
- 1 消息队列的基础概念
- 2 测试类实现消息发送和接收
- 2.1 增加jar依赖
- 2.2 在/src/test/java创建消息发送类RocketMQSendTest,启动后控制台message验证结果
- 2.3 在/src/test/java创建消息接收类RocketMQReceiveTest,启动看输出,再启动发送类看输出
- 3 简单业务模拟
- 3.1 order产生消息端代码
- 3.1.1 order服务pom文件引入2个mq依赖
- 3.1.2 order服务的yml文件中添加配置
- 3.1.3 下单方法中,发送mq消息`rocketMQTemplate.convertAndSend("orderTopic", order);`
- 3.1.4 启动服务,浏览器访问`http://localhost:8091/order/product/1`后,在mq控制台查看主题为orderTopic的消息
- 3.2 producer消费消息端代码
- 4 mq消息类型演示
1 消息队列的基础概念
producerGroup:生产者组
producer:生产者
consumerGroup:消费者组
consumer:消费者
nameServer(邮局):协调者,broker注册信息,发送者和接收者通过它获取broker信息
broker(邮递员):负责消息的接收、存储、显示
topic(地区):消息主题类型,发送接收都需要创建topic
messageQueue(邮件):消息队列,一个topic可以有多个messageQueue
message(邮件内容):具体消息
2 测试类实现消息发送和接收
在order服务做简单演示
2.1 增加jar依赖
<!--添加rocketmq的依赖-->
<dependency>
<groupId>org.apache.rockemq</groupId>
<artifactId>roketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
2.2 在/src/test/java创建消息发送类RocketMQSendTest,启动后控制台message验证结果
package cn.hzp;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
/**
* - 1 创建消息生产者DefaultMQProducer,设置组名produceGroup
* - 2 为生产者指定nameserver地址192.168.149.11:9876
* - 3 启动生产者start
* - 4 创建消息对像Message:主题myTopic、标签myTag、消息体String.getBytes();
* - 5 发送消息send
* - 6 关闭生产者shutdown
*/
public class RocketMQSendTest {
public static void main(String[] args) {
try {
DefaultMQProducer produceGroup = new DefaultMQProducer("produceGroup");
produceGroup.setNamesrvAddr("192.168.149.11:9876");
produceGroup.start();
Message message = new Message("myTopic", "myTag", "消息体".getBytes());
SendResult sendResult = produceGroup.send(message, 10000);
System.out.println(sendResult);
produceGroup.shutdown();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
2.3 在/src/test/java创建消息接收类RocketMQReceiveTest,启动看输出,再启动发送类看输出
package cn.hzp;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* - 1 创建消费者DefaultMQPushConsumer,组名consumerGroup
* - 2 指定nameserver地址192.168.149.11:9876
* - 3 指定消费者订阅subscribe的主题myTopic和标签*
* - 4 设置回调函数registerMessageListener,通过MessageListenerConcurrently对象编写处理消息方法,返回消息状态ConsumeConcurrentlyStatus.CONSUME_SUCCESS
* - 5 启动消费者start
*/
public class RocketMQReceiveTest {
public static void main(String[] args) {
try {
DefaultMQPushConsumer consumerGroup = new DefaultMQPushConsumer("consumerGroup");
consumerGroup.setNamesrvAddr("192.168.149.11:9876");
consumerGroup.subscribe("myTopic", "*");
consumerGroup.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
System.out.println("消息列表为:" + list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumerGroup.start();
System.out.println("消费者启动");
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
3 简单业务模拟
order服务下单成功发出消息,由product服务监听到消息后,发送短信通知
3.1 order产生消息端代码
3.1.1 order服务pom文件引入2个mq依赖
rocketmq-spring-boot-starter2.0.2和rocketmq-client4.4.0
<!--添加rocketmq的依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
3.1.2 order服务的yml文件中添加配置
rocketmq:
# rocketmq服务地址
name-server: 192.168.149.11:9876
# 生产者组
producer:
group: shop-order
3.1.3 下单方法中,发送mq消息rocketMQTemplate.convertAndSend("orderTopic", order);
3.1.4 启动服务,浏览器访问http://localhost:8091/order/product/1
后,在mq控制台查看主题为orderTopic的消息
3.2 producer消费消息端代码
3.2.1 product服务pom文件引入2个mq依赖,同3.1.1
3.2.2 product服务yml文件增加配置
rocketmq:
# rocketmq服务地址
name-server: 192.168.149.11:9876
3.2.3 product服务增加接收类cn.hzp.mq.SmsService
package cn.hzp.mq;
import cn.hzp.domain.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* - 需要实现RocketMQListener<消息内容>,消息内容为Order
* - 监听注解@RocketMQMessageListener,
* - 指定消费者组名consumerGroup
* - 消费主题topic
* - consumeMode消费模式,ORDERLY顺序,CONCURRENTLY默认同步即没顺序
* - messageModel消息模式,
* 广播BRODCASTING,一个消息被多个消费者多次消费
* 默认集群CLUSTERING,一个消息只能被一个消费者消费
*/
@Slf4j
@Service
@RocketMQMessageListener(
consumerGroup = "productGroup",
topic = "orderTopic",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING
)
public class SmsService implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("接收消息:{},接下来发送短信", order);
}
}
3.2.4 重启product服务,浏览器请求http://localhost:8091/order/product/1
,查看idea的信息输出
4 mq消息类型演示
普通消息(异步、同步、单向)、顺序消息、事务消息
4.1 测试普通消息
包括可靠同步,可靠异步,单向发送
- 可靠同步:发送方等接收方收到消息确认的通知后才发第二条消息,如报名短信通知
- 可靠异步:发送方只管发消息通过回调接口异步处理接收方的响应结果,用于链路耗时较长如视频上传后消费者转码
- 单向发送:只发送,如日志收集
4.1.1 order服务的pom文件引入测试依赖
<!--测试依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
4.1.2 在src/test/java下,新建测试类cn.hzp.RocketMQMessageTypeTest
package cn.hzp;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = OrderApplication.class)
@Slf4j
public class RocketMQMessageTypeTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void ordinaryMessage() throws Exception {
// 同步消息,第一个参数为topic:tag,tag可以为空,直接写topic;
SendResult sendResult = rocketMQTemplate.syncSend("typeTopicSync:tag1", "这是同步消息", 10000);
log.info("同步消息发送结果为:{}", sendResult);
// 异步消息
rocketMQTemplate.asyncSend("typeTopicAync", "这是异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.info("消息发送异常,{}", throwable.getMessage());
}
});
log.info("异步消息准备要发送了");
Thread.sleep(30000);
// 单向消息
rocketMQTemplate.sendOneWay("typeTopicOneWay", "这是单向消息");
}
}
4.1.3 启动测试,查看mq网页控制台消息信息
4.2 顺序消息,在类RocketMQMessageTypeTest中增加测试方法
会将同一个topic的一堆消息发到一个messageQueue,发送方法增加orderly后缀即可,如单向顺序消息
@Test
public void orderMessage() {
// 单向顺序消息,hashkey为分配到哪个队列的key
for (int i = 0; i < 10; i++) {
rocketMQTemplate.sendOneWayOrderly("typeTopicOneWay", "这是单向消息", "xxx");
}
}
测试,在mq网页控制台的topic标签下的状态查看队列的最大位点信息
4.3 事务消息
执行流程:
- 1 发送者(order订单服务)发送半事务消息(订单信息order)到mq服务端,mq服务端回应半事务消息发送成功
- 2 发送者(order订单服务)执行本地事务(下单操作),执行结果告诉mq服务端,mq服务端根据结果对消息commit或rollback
- 3 如果mq服务端没有收到执行结果,就回查本地事务状态,mq收到commit才投送消息出去
4.3.1 order服务发送半事务消息,在类OrderController,增加测试方法
/**
* 测试mq的事务消息
* - 1 向mq服务端发送半事务消息,mq服务端回应消息接收情况
* - 2 执行本地事务下单操作,结果通知mq服务端,成功commit失败rollback
* - 3 mq服务端没有收到通知,回查本地事务,成功commit失败rollback
*/
@RequestMapping("/order/testMQ")
public TransactionSendResult testRocketMQ(){
Order order = Order.builder()
.pid(1)
.uid(1).uname("测试用户")
.number(1)
.build();
// 演示,半事务消息,这里发送半事务消息,可以放在单独service操作
UUID uuid = UUID.randomUUID();
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
"txProducerGroup",
"topicTransaction:tagOrder",
MessageBuilder.withPayload(order).setHeader("txId", uuid).build(),
order);
log.info("发送事务消息,结果为{}", transactionSendResult);
return transactionSendResult;
}
4.3.2 domain服务增加本地事务日志txLog,便于mq服务端回查本地下单操作事务执行结果
@Data
@Entity(name = "shop_txlog")
public class TxLog {
@Id
private String txId;
private Date date;
}
4.3.3 order服务在目录src/main/java下增加事务日志持久层cn.hzp.dao.TxLogDao
public interface TxLogDao extends JpaRepository<TxLog, String> {
}
4.3.4 在orderService和impl中增加下单操作时保存事务日志,便于mq服务端回查
@Transactional(rollbackFor = Exception.class)
@Override
public void saveMQTest(String txId, Order order) {
TxLog txLog = new TxLog();
txLog.setTxId(txId);
txLog.setDate(new Date());
TxLog txLogResult = txLogDao.save(txLog);
log.info("保存日志信息,便于mq回查事务结果,{}", txLogResult);
Order orderResult = orderDao.save(order);
log.info("保存订单信息,{}", orderResult);
}
4.3.5 在src/main/java目录下新建类cn.hzp.mq.MQTranListener,实现本地事务(下单)和消息回查
package cn.hzp.mq;
import cn.hzp.dao.TxLogDao;
import cn.hzp.domain.Order;
import cn.hzp.domain.TxLog;
import cn.hzp.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "txProducerGroup")
public class MQTranListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private TxLogDao txLogDao;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
try {
log.info("执行本地事务:进行下单操作");
String txId = (String) message.getHeaders().get("txId");
Order order = (Order) arg;
orderService.saveMQTest(txId, order);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("mq服务端执行消息回查");
String txId = (String) message.getHeaders().get("txId");
TxLog txLog = txLogDao.findById(txId).get();
if (txLog != null) {
log.info("回查结果,下单成功");
return RocketMQLocalTransactionState.COMMIT;
} else {
log.info("回查结果,下单失败");
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
4.3.6 启动order服务,
- 浏览器请求
http://localhost:8091/order/testMQ
测试本地事务 - 在executeLocalTransaction方法的return前打断点,然后停止服务重启,测试回查
推荐阅读