高级特性和大厂常考面试题
1.引入消息队列之后该如何保证其高可用性
答:事物,签收,持久化和zookeeper+replicated-leveldb-store的主从集群都是高可用性的体现
2.异步投递Async Sends
对于一个Slow (慢)Consumer(消费者),使用同步发送消息可能出现Producer堵塞等情况,慢消费者适合使用异步发送。
异步发送如何确认发送成功?
达:正确的异步发送方法是需要接收回调函数的。
package com.wsy.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import javax.jms.*;
import java.util.UUID;
public class JmsProducer_Async {
public static final String ACTIVEMQ_URL = "tcp://192.168.0.123:61616";
public static final String QUEUE_NAME = "queue_asyncSend";
public static void main(String[] args) throws JMSException {
// 创建连接工厂,按照给定的url地址采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 设置消息发送模式是AsyncSend模式
activeMQConnectionFactory.setUseAsyncSend(true);
// 通过连接工厂,获取Connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(目的地有两个子接口,分别是Queue和Topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消息生产者,生产的消息放到queue中
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("message-" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString());
String messageID = textMessage.getJMSMessageID();
// 设置回调函数,通过回调函数判断是否发送成功,成功走onSuccess()方法,失败走onException()方法
activeMQMessageProducer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println(messageID + " success!");
}
@Override
public void onException(JMSException e) {
System.out.println(messageID + " fail!");
}
});
}
// 按照资源打开的相反顺序关闭资源
activeMQMessageProducer.close();
session.close();
connection.close();
}
}
同步和异步的区别就在这里,同步发送send不阻塞就一定发送成功,异步发送需要接收回调函数并由客户端再判断一次是否发送成功
3.延迟投递和定时投递
要在activemq.xml中配置schedulerSupport属性为true
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
java代码里面封装的辅助消息类型:ScheduledMessage
package queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import javax.jms.*;
public class JmsProduce_DelayAndSchedule {
public static final String ACTIVEMQ_URL="tcp://192.168.56.10:61616";
//public static final String ACTIVEMQ_URL="tcp://localhost:61616";
public static final String QUEUE_NAME="delay-01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);//开启化持久,队列弄人持久化
long delay=3 * 1000;
long period=4 * 1000;
int repeat=5;
for (int i=1;i<=3;i++){
TextMessage textMessage = session.createTextMessage("msg" + i);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
messageProducer.send(textMessage);
}
//7、关闭
messageProducer.close();
session.close();
connection.close();
System.out.println("**********************消息生产到MQ********************");
}
}
5.ActiveMQ消费重试机制
具体哪些情况会引起消息重发?
Client用了事务且在session中调用了rollback()
Client用了事务且在调用commit()前关闭或者没有commit()
Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()
消息重发的间隔和重发次数是多少?
消息重发的间隔默认1秒,6次
对有毒消息Poison ACK的理解:
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费端会给MQ发送一个poison ack表示这条消息有毒,告诉broker不要再发了。这时候broker会把这条消息放到死信队列中。
6.死信队列
ActiveMQ中引入了死信队列的概念,一条消息被重复发了多次(默认重发6次redeliveryCounter=6),这条消息将会被ActiveMQ加入死信队列。开发人员可以在死信队列中查看出错的消息,进行人工干预。
通常生产环境中,使用MQ会设计两个队列,一个是业务队列,一个是死信队列。
SharedDealLetterStrategy:共享死信队列策略
所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端的默认策略。默认为ActiveMQ.DLQ,可以通过deadLetterQueue属性来设置。
<dealLetterStrategy>
<sharedDeadLetterStrategy deadLetterQueue="MyDeadLetterQueue">
</dealLetterStrategy>
IndividualDeadLetterStrategy:独特死信队列策略
对于Queue而言,死信通道前缀默认为“ActiveMQ.DLQ.Queue”。
对于Topic而言,死信通道的前缀默认为“ActiveMQ.DLQ.Topic”。
我们使用默认前缀+队列或者主题的名称来指定死信队列。比如:ActiveMQ.DLQ.Queue.MyQueue和ActiveMQ.DLQ.Topic.MyTopic。
默认情况下,无论是Topic还是Queue,broker默认都会使用Queue来保存DeadLetter,即死信队列通常为Queue,开发者也可以指定为Topic。
<policyEntry queue="order">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
如果需要直接删除过期的消息,而不需要发送到死信队列,那么设置processExpired参数为false,这个参数的默认值是true的。
默认情况下,ActiveMQ不会把非持久的死信存放到死信队列,如果希望存放非持久的死信,那么需要修改processNonPersistent参数为true,这个参数默认值是fasle。
7.如何保证消息不被重复消费呢?幂等性问题请你谈谈?
防止重复调用。
网络延迟传输中,会进行MQ的重试,可能会出现重复消费。
可以使用第三方服务来做消费记录,以redis为例,给消息分配一个全局id,只要消息消费过,将<id,message>写入redis,当消费者开始消费之前,先去redis里查看有没有消费记录。
如果有消费记录,那么这条消息已经被处理过,否则,执行消费逻辑,并将<id,message>存到redis里。
上一篇: Spring常问的面试题
下一篇: 【面试题笔记-Java】计算机网络
推荐阅读