欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring + RocketMQ入门 博客分类: RocketMQspring springRocketMQ

程序员文章站 2024-02-13 12:00:34
...

RocketMQ简单介绍

RocketMQ 是阿里出品的一款MQ,现在已经捐给Apache并成为Apache*项目,更多介绍请 移步
在这里向大家介绍一个学习RocketMQ的好文章:
RocketMQ实战(一)
RocketMQ实战(二)
RocketMQ实战(三)
RocketMQ实战(四)

一些说明

  • 本文给出的代码均为代码片段,并非完整代码
  • 阅读本文前,您需要具备以下知识:
    • 了解RocketMQ是什么并安装它
    • 了解JUnit并会简单使用
    • 了解Spring并会简单使用
    • 了解 maven 并会简单使用

引入相关库

maven 引入:

    <!-- junit5 -->
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>${junit5.version}</version>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.junit.platform</groupId>
        <artifactId>junit-platform-runner</artifactId>
        <version>${junit5-platform.version}</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.junit.platform</groupId>
        <artifactId>junit-platform-console-standalone</artifactId>
        <version>${junit5-platform.version}</version>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${spring.version}</version>
    </dependency>
    
    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>${spring.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>${rocketmq-client.version}</version>
    </dependency>

相关库版本:

<junit5.version>5.1.0</junit5.version>
<junit5-platform.version>1.1.0</junit5-platform.version>
<rocketmq-client.version>4.2.0</rocketmq-client.version>
<spring.version>5.0.4.RELEASE</spring.version>

创建spring配制文件

创建2个spring配制文件,一个用于生产者,一个用于消费者,相关参数含义就不再介绍

applicationContext-producer.xml 生产者
<bean id="rocketmqProduct" class="org.apache.rocketmq.client.producer.DefaultMQProducer" init-method="start" destroy-method="shutdown">
    <property name="producerGroup" value="producer1"/>
    <property name="namesrvAddr" value="127.0.0.1:9876"/>
</bean>
applicationContext-consumer.xml 消费者
<bean id="consumerImplTest" class="org.klw.test.rocketMqTest.spring.ConsumerTestImpl" />

<bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown">
    <property name="consumerGroup" value="concurrent_consumer"/>
    <property name="namesrvAddr" value="127.0.0.1:9876"/>
    <property name="messageListener" ref="consumerImplTest"/>
    <property name="subscription">
        <map>
            <entry key="TopicTest">
                <value>*</value>
            </entry>
        </map>
    </property>
</bean>

大家应该注意到了消费者的配制中有一个org.klw.test.rocketMqTest.spring.ConsumerTestImpl,现在我们去实现它:

public class ConsumerImplTest implements MessageListenerConcurrently {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
        //返回消费状态
        //CONSUME_SUCCESS 消费成功
        //RECONSUME_LATER 消费失败,需要稍后重新消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

到这里,一个简单的生产者就配制好了,一个简单的消费者也配制并实现了,下面来使用它们
接下来我们要创建2个JUnit测试类,一个是生产者一个是消费者
生产者:

@RunWith(JUnitPlatform.class)  // org.junit.platform.runner.JUnitPlatform
@ExtendWith(SpringExtension.class)  // org.springframework.test.context.junit.jupiter.SpringExtension
@ContextConfiguration({"classpath*:applicationContext-producer.xml"})
public class JUnitProducer {

    @Autowired
    private DefaultMQProducer producer;
    
    @Test
    public void producerData() throws InterruptedException {
    for (int i = 0; i < 10; i++) {  // 发10条消息
        try {
        Message msg = new Message("TopicTest", // topic
            "TagA", // tag
            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
        );

        // 调用producer的send()方法发送消息
        // 这里调用的是同步的方式,所以会有返回结果
        SendResult sendResult = producer.send(msg);

        // 打印返回结果,可以看到消息发送的状态以及一些相关信息
        System.out.println(sendResult);
        } catch (Exception e) {
        e.printStackTrace();
        Thread.sleep(1000);
        }
    }
    }
    
}

消费者:

@RunWith(JUnitPlatform.class)
@ExtendWith(SpringExtension.class)
@ContextConfiguration({"classpath*:applicationContext-consumer.xml"})
public class JUnitConsumer {

    @Test
    public void runConsumer() {
    System.out.println("Consumer Started.");
        
        // 下面的代码把线程阻塞住,这样就可以先运行消费者再运行生产者.当然不要也可以,不要的化就得先运行生产者,
      //再运行消费者,生产者先把消息发送到MQ上,消费者启动后从MQ上拿消息
    synchronized (JUnitConsumer.class) {
        while (true) {
        try {
            JUnitConsumer.class.wait();
        } catch (Throwable e) {
            e.printStackTrace();
        }
        }
    }
    }
    
}

代码差不多了,通过JUnit先运行消费者,再运行生产者,在消费者的控制台中能看到生产者发送的消息已经打印出来

写在结束

本文只是简单的描述了如何在spring中配制RocketMQ
作者也是初次接触RocketMQ,正在学习中,欢迎大家一起讨论学习

相关标签: spring RocketMQ