SpringBoot整合RocketMq
程序员文章站
2022-05-24 15:45:37
...
引入依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
编写application.properties文件
spring.application.name = itcast-rocketmq
spring.rocketmq.nameServer=192.168.142.128:9876
spring.rocketmq.producer.group=my-group
编写启动类
package cn.itcast.rocketmq.spring;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
编写producer
package cn.itcast.rocketmq.spring;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SpringProducer {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
public void sendMsg(String topic, String msg){
this.rocketMQTemplate.convertAndSend(topic, msg);
}
}
编写consumer
package cn.itcast.rocketmq.spring;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = "spring-my-topic",
consumerGroup = "spring-consumer-group",
selectorExpression = "*",
consumeMode = ConsumeMode.CONCURRENTLY
)
public class SpringConsumer implements RocketMQListener<String> {
// 当有消息的时候就会执行这个 方法
@Override
public void onMessage(String msg) {
System.out.println("接收到消息 -->" + msg);
}
}
测试类(第二个方法在后面的事务使用到)
package cn.itcast.rocketmq.spring;
import cn.itcast.rocketmq.spring.transaction.SpringTransactionProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {
@Autowired
private SpringProducer springProducer;
@Autowired
private SpringTransactionProducer springTransactionProducer;
@Test
public void testSendMsg() {
String msg = "大家好";
this.springProducer.sendMsg("spring-my-topic", msg);
System.out.println("发送成功");
}
@Test public void testSendMsg2(){
// this.springTransactionProducer.sendMsg("spring-tx-my-topic", "第二个spring的消息");
this.springTransactionProducer.sendMsg("spring-tx-my-topic", "测试回查1111");
try {
Thread.sleep(99999999999L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
---------------------------------------------------------------------------------
事务
编写producer
package cn.itcast.rocketmq.spring.transaction;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class SpringTransactionProducer {
@Autowired(required = false)
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息
*
* @param topic
* @param msg
*/
public void sendMsg(String topic, String msg) {
Message message = MessageBuilder.withPayload(msg).build();
// myTransactionGroup要和@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")定义的一致
this.rocketMQTemplate.sendMessageInTransaction("myTransactionGroup", topic, message, null);
System.out.println("发送消息成功");
}
}
编写consumer
package cn.itcast.rocketmq.spring.transaction;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
// 这里的topic一定要与发送端的topic一致
topic = "spring-tx-my-topic",
consumerGroup = "haoke-consumer",
selectorExpression = "*"
)
public class SpringTxConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("接收到消息 -> " + msg);
}
}
编写事务消息
package cn.itcast.rocketmq.spring.transaction;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import java.util.HashMap;
import java.util.Map;
// 这个注解中已经包含了@Component注解,因此 这里是不需要再次添加@Component注解了
@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private static Map<String, RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();
/**
* 执行业务逻辑
* @param message
* @param o
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
// 通过message 去获取到事务的id
String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
try {
System.out.println("执行操作1");
Thread.sleep(500);
System.out.println("执行操作2");
Thread.sleep(800);
// 正常时将,提交的状态存入map中
STATE_MAP.put(transId, RocketMQLocalTransactionState.COMMIT);
return RocketMQLocalTransactionState.UNKNOWN;
} catch (Exception e) {
e.printStackTrace();
}
// 上面的过程出现了异常,则将数据rollback的数据存入map中
STATE_MAP.put(transId, RocketMQLocalTransactionState.ROLLBACK);
return RocketMQLocalTransactionState.ROLLBACK;
}
/**
* 回查
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
// 通过事务的id去map中取状态
String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.println("回查消息 -> transId = " + transId + ", state = " + STATE_MAP.get(transId));
return STATE_MAP.get(transId);
}
}
启动myApplication等待
启动测试类中的第二个方法等待
回查消息成功