欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rocketmq 示例 顺序消息

程序员文章站 2022-07-14 23:42:23
...

rocketmq 顺序消息

 

rocketmq支持顺序消息,具体实现方式为:发送端将同一类型的消息发送到同一个消息队列,消费端使用顺序监听接口消费消息即可实现顺序消费

 

*********************************************************

使用示例

 

****************************************

producer 端

 

@Service
public class ProducerService {

    @Value("${rocketmq.producerGroup}")
    private String producerGroup;

    @Value("${rocketmq.namesrv}")
    private String namesrv;

    private DefaultMQProducer producer;

    @PostConstruct
    public void initDefaultMQProducer(){
        producer=new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrv);
        producer.setRetryTimesWhenSendFailed(2);

        try{
            producer.start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void sendInOrder(){
        try{
            for(int i=0;i<100;i++){
                Message message=new Message("topic-2","order",("瓜田李下 顺序消息 "+i).getBytes());
                SendResult result=producer.send(message, (list, message1, o) -> {
                    int index=Integer.parseInt(o.toString());

                    return list.get(index);
                },1);
                System.out.println(result);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void destroy(){
        if(producer!=null){
            producer.shutdown();
        }
    }
}

 

************************************

consumer 端

 

@Service
public class ConsumerService {

    @Value("${rocketmq.consumerGroup}")
    private String consumerGroup;

    @Value("${rocketmq.namesrv}")
    private String namesrv;

    @PostConstruct
    public void consumeOrder(){
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("consumerGroup"+2);
        consumer.setInstanceName("consumerGroup2");
        consumer.setNamesrvAddr(namesrv);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        try{
            Thread.sleep(2000);
            consumer.subscribe("topic-2","order");
            consumer.registerMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {
                try{
                   for(MessageExt messageExt:list){
                       String body=new String(messageExt.getBody());
                       System.out.println(body);
                       System.out.println("消费成功");
                   }

                   return ConsumeOrderlyStatus.SUCCESS;
                }catch (Exception e){
                    e.printStackTrace();
                    System.out.println("消费失败");
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }

        try{
            consumer.start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

 

*************************************************

控制台输出

 

         rocketmq 示例 顺序消息