RabbitMQ使用场景练习:消息确认机制(十一)
程序员文章站
2022-07-13 15:11:34
...
- 消息确认机制
1、从实验来看,消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。
2、而为了确认消息是否被发送给queue,应该在发送消息中启用参数mandatory=true,使用ReturnListener接收未被发送成功的消息。
3、接下来就需要确认消息是否被有效消费。publisher端目前并没有提供监听事件,但提供了应答机制来保证消息被成功消费,应答方式:
basicAck:成功消费,消息从队列中删除
basicNack:requeue=true,消息重新进入队列,false被删除
basicReject:等同于basicNack
basicRecover:消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer
- 应答模式之transaction机制
package com.demo.mq.rabbitmq.example11; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 应答模式之transaction机制 * @author sheungxin * */ public class TxDemo { private static String exchange_name=""; private static String queue_name="tx_queue"; /** * transaction机制发送消息,事务机制:手动提交和回滚 * 执行txCommit,消息才会转发给队列进入ready状态 * 执行txRollback,消息被取消 * @param mes * @throws Exception */ public static void txSend(Serializable mes) throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //开启transaction机制 channel.txSelect(); channel.queueDeclare(queue_name,false,false,true,null); for(int i=0;i<10;i++){ try{ channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i)); //do something // int n=5/0;//试验消息回滚 channel.txCommit();//提交消息 System.out.println("发布消息"+mes.toString()+i); }catch(Exception e){ channel.txRollback();//异常,取消消息 System.out.println("回滚消息"+mes.toString()+i); } } } /** * transaction机制接收消息,事务机制:手动提交和回滚 * 消费者需要执行basicAck,并txCommit(自动应答模式自动处理,本例中采用手动应答模式) * @throws Exception */ public static void txRecv() throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //开启transaction机制 channel.txSelect(); channel.queueDeclare(queue_name,false,false,true,null); //关闭自动应答模式(自动应答模式不需要ack、txCommit),需要手动basicAck,并执行txCommit channel.basicConsume(queue_name, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{ String mes=SerializationUtils.deserialize(body); System.out.println("tx Received :'"+mes+"' done"); channel.basicAck(envelope.getDeliveryTag(), false); channel.txCommit(); } }); } public static void main(String[] args) throws Exception { txSend("hello world!"); txRecv(); } }
- 应答模式之confirm机制
2、confirmSelect,进入confirm消息确认模式,确认方式:1、异步ConfirmListener;2、同步waitForConfirms
3、ConfirmListener、waitForConfirms均需要配合confirm机制使用
4、暂时未弄明白confirm机制在consumer的应用,ConfirmListener在consumer中无效
5、basicNack、basicReject:参数requeue=true时,消息会重新进入队列
6、autoDelete队列在消费者关闭后不管是否还有未处理的消息都会关闭掉
package com.demo.mq.rabbitmq.example11; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; /** * 应答模式之confirm机制:消息发送 * @author sheungxin * */ public class ConfirmSend { private static String exchange_name=""; private static String queue_name="tx_queue"; /** * confirm机制:确认publisher发送消息到broker,由broker进行应答(不能确认是否被有效消费) * confirmSelect,进入confirm消息确认模式,确认方式:1、异步ConfirmListener;2、同步waitForConfirms * ConfirmListener、waitForConfirms均需要配合confirm机制使用 * @param mes * @throws Exception */ public static void txSend(Serializable mes) throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //开启transaction机制 channel.confirmSelect(); channel.queueDeclare(queue_name,false,false,true,null); //异步实现发送消息的确认(此部分的消息确认是指发送消息到队列,并非确认消息的有效消费) channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //multiple:测试发现multiple随机true或false,原因未知 System.out.println("Nack deliveryTag:"+deliveryTag+",multiple:"+multiple); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("Ack deliveryTag:"+deliveryTag+",multiple:"+multiple); } }); for(int i=0;i<10;i++){ channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i)); } // channel.waitForConfirms();//同步实现发送消息的确认 System.out.println("-----------"); channel.close(); conn.close(); } public static void main(String[] args) throws Exception { txSend("hello world!"); } }
package com.demo.mq.rabbitmq.example11; import java.io.IOException; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 应答模式之confirm机制:消息接收 * @author sheungxin * */ public class ConfirmRecv { private static String queue_name="tx_queue"; /** * confirm机制:暂时未弄明白confirm机制在consumer的应用,ConfirmListener在consumer中无效 * basicNack、basicReject:参数requeue=true时,消息会重新进入队列 * autoDelete队列在消费者关闭后不管是否还有未处理的消息都会关闭掉 * @throws Exception */ public static void txRecv() throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //开启transaction机制 // channel.confirmSelect(); //autoDelete,true只要被消息 channel.queueDeclare(queue_name,false,false,true,null); //关闭自动应答模式 channel.basicConsume(queue_name, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{ String mes=SerializationUtils.deserialize(body); //multiple批量提交,true提交小于参数中tag消息 long n=envelope.getDeliveryTag()%3; if(n==0){ channel.basicAck(envelope.getDeliveryTag(), false); }else if(n==1){ //requeue,true重新进入队列 channel.basicNack(envelope.getDeliveryTag(), false, true); }else{ //requeue,true重新进入队列,与basicNack差异缺少multiple参数 channel.basicReject(envelope.getDeliveryTag(), true); } try { Thread.sleep(2*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println((n==0?"Ack":n==1?"Nack":"Reject")+" mes :'"+mes+"' done"); } }); } public static void main(String[] args) throws Exception { txRecv(); } }
上一篇: RabbitMQ源码分析 – 实体初始化