欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

6-2消息队列基础概念简单案例集成

程序员文章站 2024-01-05 10:28:04
...

文章目录

1 消息队列的基础概念

producerGroup:生产者组
producer:生产者
consumerGroup:消费者组
consumer:消费者
nameServer(邮局):协调者,broker注册信息,发送者和接收者通过它获取broker信息
broker(邮递员):负责消息的接收、存储、显示
topic(地区):消息主题类型,发送接收都需要创建topic
messageQueue(邮件):消息队列,一个topic可以有多个messageQueue
message(邮件内容):具体消息

2 测试类实现消息发送和接收

在order服务做简单演示

2.1 增加jar依赖

        <!--添加rocketmq的依赖-->
        <dependency>
            <groupId>org.apache.rockemq</groupId>
            <artifactId>roketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

2.2 在/src/test/java创建消息发送类RocketMQSendTest,启动后控制台message验证结果

package cn.hzp;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * - 1 创建消息生产者DefaultMQProducer,设置组名produceGroup
 * - 2 为生产者指定nameserver地址192.168.149.11:9876
 * - 3 启动生产者start
 * - 4 创建消息对像Message:主题myTopic、标签myTag、消息体String.getBytes();
 * - 5 发送消息send
 * - 6 关闭生产者shutdown
 */
public class RocketMQSendTest {
    public static void main(String[] args) {
        try {
            DefaultMQProducer produceGroup = new DefaultMQProducer("produceGroup");
            produceGroup.setNamesrvAddr("192.168.149.11:9876");
            produceGroup.start();
            Message message = new Message("myTopic", "myTag", "消息体".getBytes());
            SendResult sendResult = produceGroup.send(message, 10000);
            System.out.println(sendResult);
            produceGroup.shutdown();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

2.3 在/src/test/java创建消息接收类RocketMQReceiveTest,启动看输出,再启动发送类看输出

package cn.hzp;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * - 1 创建消费者DefaultMQPushConsumer,组名consumerGroup
 * - 2 指定nameserver地址192.168.149.11:9876
 * - 3 指定消费者订阅subscribe的主题myTopic和标签*
 * - 4 设置回调函数registerMessageListener,通过MessageListenerConcurrently对象编写处理消息方法,返回消息状态ConsumeConcurrentlyStatus.CONSUME_SUCCESS
 * - 5 启动消费者start
 */
public class RocketMQReceiveTest {
    public static void main(String[] args) {
        try {
            DefaultMQPushConsumer consumerGroup = new DefaultMQPushConsumer("consumerGroup");
            consumerGroup.setNamesrvAddr("192.168.149.11:9876");
            consumerGroup.subscribe("myTopic", "*");
            consumerGroup.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
                System.out.println("消息列表为:" + list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumerGroup.start();
            System.out.println("消费者启动");
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

3 简单业务模拟

order服务下单成功发出消息,由product服务监听到消息后,发送短信通知

3.1 order产生消息端代码

3.1.1 order服务pom文件引入2个mq依赖

rocketmq-spring-boot-starter2.0.2和rocketmq-client4.4.0

        <!--添加rocketmq的依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

3.1.2 order服务的yml文件中添加配置

rocketmq:
  # rocketmq服务地址
  name-server: 192.168.149.11:9876
  # 生产者组
  producer:
    group: shop-order

3.1.3 下单方法中,发送mq消息rocketMQTemplate.convertAndSend("orderTopic", order);

3.1.4 启动服务,浏览器访问http://localhost:8091/order/product/1后,在mq控制台查看主题为orderTopic的消息

3.2 producer消费消息端代码

3.2.1 product服务pom文件引入2个mq依赖,同3.1.1

3.2.2 product服务yml文件增加配置

rocketmq:
  # rocketmq服务地址
  name-server: 192.168.149.11:9876

3.2.3 product服务增加接收类cn.hzp.mq.SmsService

package cn.hzp.mq;

import cn.hzp.domain.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * - 需要实现RocketMQListener<消息内容>,消息内容为Order
 * 	- 监听注解@RocketMQMessageListener,
 * 		- 指定消费者组名consumerGroup
 * 		- 消费主题topic
 * 		- consumeMode消费模式,ORDERLY顺序,CONCURRENTLY默认同步即没顺序
 * 		- messageModel消息模式,
 * 			广播BRODCASTING,一个消息被多个消费者多次消费
 * 			默认集群CLUSTERING,一个消息只能被一个消费者消费
 */
@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = "productGroup",
        topic = "orderTopic",
        consumeMode = ConsumeMode.CONCURRENTLY,
        messageModel = MessageModel.CLUSTERING
)
public class SmsService implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("接收消息:{},接下来发送短信", order);
    }
}

3.2.4 重启product服务,浏览器请求http://localhost:8091/order/product/1,查看idea的信息输出

4 mq消息类型演示

普通消息(异步、同步、单向)、顺序消息、事务消息

4.1 测试普通消息

包括可靠同步,可靠异步,单向发送

  • 可靠同步:发送方等接收方收到消息确认的通知后才发第二条消息,如报名短信通知
  • 可靠异步:发送方只管发消息通过回调接口异步处理接收方的响应结果,用于链路耗时较长如视频上传后消费者转码
  • 单向发送:只发送,如日志收集

4.1.1 order服务的pom文件引入测试依赖

        <!--测试依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>

4.1.2 在src/test/java下,新建测试类cn.hzp.RocketMQMessageTypeTest

package cn.hzp;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = OrderApplication.class)
@Slf4j
public class RocketMQMessageTypeTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Test
    public void ordinaryMessage() throws Exception {
        // 同步消息,第一个参数为topic:tag,tag可以为空,直接写topic;
        SendResult sendResult = rocketMQTemplate.syncSend("typeTopicSync:tag1", "这是同步消息", 10000);
        log.info("同步消息发送结果为:{}", sendResult);

        // 异步消息
        rocketMQTemplate.asyncSend("typeTopicAync", "这是异步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("消息发送成功,{}", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("消息发送异常,{}", throwable.getMessage());
            }
        });
        log.info("异步消息准备要发送了");
        Thread.sleep(30000);

        // 单向消息
        rocketMQTemplate.sendOneWay("typeTopicOneWay", "这是单向消息");
    }
}

4.1.3 启动测试,查看mq网页控制台消息信息

4.2 顺序消息,在类RocketMQMessageTypeTest中增加测试方法

会将同一个topic的一堆消息发到一个messageQueue,发送方法增加orderly后缀即可,如单向顺序消息

    @Test
    public void orderMessage() {
        // 单向顺序消息,hashkey为分配到哪个队列的key
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.sendOneWayOrderly("typeTopicOneWay", "这是单向消息", "xxx");
        }
    }

测试,在mq网页控制台的topic标签下的状态查看队列的最大位点信息

4.3 事务消息

执行流程:

  • 1 发送者(order订单服务)发送半事务消息(订单信息order)到mq服务端,mq服务端回应半事务消息发送成功
  • 2 发送者(order订单服务)执行本地事务(下单操作),执行结果告诉mq服务端,mq服务端根据结果对消息commit或rollback
  • 3 如果mq服务端没有收到执行结果,就回查本地事务状态,mq收到commit才投送消息出去

4.3.1 order服务发送半事务消息,在类OrderController,增加测试方法

    /**
     * 测试mq的事务消息
     * - 1 向mq服务端发送半事务消息,mq服务端回应消息接收情况
     * - 2 执行本地事务下单操作,结果通知mq服务端,成功commit失败rollback
     * - 3 mq服务端没有收到通知,回查本地事务,成功commit失败rollback
     */
    @RequestMapping("/order/testMQ")
    public TransactionSendResult testRocketMQ(){
        Order order = Order.builder()
                .pid(1)
                .uid(1).uname("测试用户")
                .number(1)
                .build();
        // 演示,半事务消息,这里发送半事务消息,可以放在单独service操作
        UUID uuid = UUID.randomUUID();
        TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
                "txProducerGroup",
                "topicTransaction:tagOrder",
                MessageBuilder.withPayload(order).setHeader("txId", uuid).build(),
                order);
        log.info("发送事务消息,结果为{}", transactionSendResult);
        return transactionSendResult;
    }

4.3.2 domain服务增加本地事务日志txLog,便于mq服务端回查本地下单操作事务执行结果

@Data
@Entity(name = "shop_txlog")
public class TxLog {
    @Id
    private String txId;
    private Date date;
}

4.3.3 order服务在目录src/main/java下增加事务日志持久层cn.hzp.dao.TxLogDao

public interface TxLogDao extends JpaRepository<TxLog, String> {
}

4.3.4 在orderService和impl中增加下单操作时保存事务日志,便于mq服务端回查

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void saveMQTest(String txId, Order order) {
        TxLog txLog = new TxLog();
        txLog.setTxId(txId);
        txLog.setDate(new Date());
        TxLog txLogResult = txLogDao.save(txLog);
        log.info("保存日志信息,便于mq回查事务结果,{}", txLogResult);
        Order orderResult = orderDao.save(order);
        log.info("保存订单信息,{}", orderResult);
    }

4.3.5 在src/main/java目录下新建类cn.hzp.mq.MQTranListener,实现本地事务(下单)和消息回查

package cn.hzp.mq;

import cn.hzp.dao.TxLogDao;
import cn.hzp.domain.Order;
import cn.hzp.domain.TxLog;
import cn.hzp.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "txProducerGroup")
public class MQTranListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderService orderService;
    @Autowired
    private TxLogDao txLogDao;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        try {
            log.info("执行本地事务:进行下单操作");
            String txId = (String) message.getHeaders().get("txId");
            Order order = (Order) arg;
            orderService.saveMQTest(txId, order);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("mq服务端执行消息回查");
        String txId = (String) message.getHeaders().get("txId");
        TxLog txLog = txLogDao.findById(txId).get();
        if (txLog != null) {
            log.info("回查结果,下单成功");
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            log.info("回查结果,下单失败");
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

4.3.6 启动order服务,

  • 浏览器请求http://localhost:8091/order/testMQ测试本地事务
  • 在executeLocalTransaction方法的return前打断点,然后停止服务重启,测试回查

上一篇:

下一篇: