rocketmq 示例 顺序消息
程序员文章站
2022-07-14 23:42:23
...
rocketmq 顺序消息
rocketmq支持顺序消息,具体实现方式为:发送端将同一类型的消息发送到同一个消息队列,消费端使用顺序监听接口消费消息即可实现顺序消费
*********************************************************
使用示例
****************************************
producer 端
@Service
public class ProducerService {
@Value("${rocketmq.producerGroup}")
private String producerGroup;
@Value("${rocketmq.namesrv}")
private String namesrv;
private DefaultMQProducer producer;
@PostConstruct
public void initDefaultMQProducer(){
producer=new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrv);
producer.setRetryTimesWhenSendFailed(2);
try{
producer.start();
}catch (Exception e){
e.printStackTrace();
}
}
public void sendInOrder(){
try{
for(int i=0;i<100;i++){
Message message=new Message("topic-2","order",("瓜田李下 顺序消息 "+i).getBytes());
SendResult result=producer.send(message, (list, message1, o) -> {
int index=Integer.parseInt(o.toString());
return list.get(index);
},1);
System.out.println(result);
}
}catch (Exception e){
e.printStackTrace();
}
}
@PreDestroy
public void destroy(){
if(producer!=null){
producer.shutdown();
}
}
}
************************************
consumer 端
@Service
public class ConsumerService {
@Value("${rocketmq.consumerGroup}")
private String consumerGroup;
@Value("${rocketmq.namesrv}")
private String namesrv;
@PostConstruct
public void consumeOrder(){
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("consumerGroup"+2);
consumer.setInstanceName("consumerGroup2");
consumer.setNamesrvAddr(namesrv);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try{
Thread.sleep(2000);
consumer.subscribe("topic-2","order");
consumer.registerMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {
try{
for(MessageExt messageExt:list){
String body=new String(messageExt.getBody());
System.out.println(body);
System.out.println("消费成功");
}
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e){
e.printStackTrace();
System.out.println("消费失败");
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
});
}catch (Exception e){
e.printStackTrace();
}
try{
consumer.start();
}catch (Exception e){
e.printStackTrace();
}
}
}
*************************************************
控制台输出
上一篇: RocketMQ 顺序消息