client与broker断开后重新连接
程序员文章站
2022-07-13 17:00:48
...
qpid的client端有可能会由于某种原因与broker断开连接,如网络连接断开,broker的节点删除等等。
公司的项目需要用java编写一个Service,用来监听broker的消息。要求如果与broker断开后,必须尝试重新连接。
那应该完成以下一个步骤:
(1)检测到client与服务器断开。
(2)如果断开,尝试重新连接。
对于(1),javax.jms.Connection对象可以设置一个ExceptionListener对象,用来监听是否发生连接异常。
以下为client端的类
如果检测到异常,就关闭客户端。启动Service时启动一个线程,监视连接是否正常,如果被关闭,则重新连接。
公司的项目需要用java编写一个Service,用来监听broker的消息。要求如果与broker断开后,必须尝试重新连接。
那应该完成以下一个步骤:
(1)检测到client与服务器断开。
(2)如果断开,尝试重新连接。
对于(1),javax.jms.Connection对象可以设置一个ExceptionListener对象,用来监听是否发生连接异常。
以下为client端的类
/** * 该类负责监听MQ的消息。消息的消费者为{@link GCMSender}的一个实例。 * * @author xiaofei.xu */ public class BrokerClient { /** Logger */ private static final Logger logger = LoggerFactory.getLogger(BrokerClient.class); /** broker连接 */ private AMQConnection connection; ...... /** 连接异常处理 */ private ExceptionListener exceptionListener; /** * 进行初始化 * * @throws AMQException * @throws URLSyntaxException * @throws JMSException */ private void init() throws URLSyntaxException, AMQException, JMSException { logger.info("# init() start"); this.connection = new AMQConnection(this.url); this.connection.setExceptionListener(this.exceptionListener); logger.info("# init() end"); } /** * 设置连接异常监听 * * @param exceptionListener * 异常监听对象 */ public void setExceptionListener(final ExceptionListener exceptionListener) { this.exceptionListener = exceptionListener; } ...... /** * 判定客户端是否关闭 * * @return 关闭:true,否则:false */ public boolean isStop() { return this.connection.isClosed(); } /** * 关闭服务 * * @throws JMSException * @throws NamingException */ public void stop() throws JMSException { logger.info("# stop() start"); if (connection != null) { connection.close(); } logger.info("# stop() end"); } }
如果检测到异常,就关闭客户端。启动Service时启动一个线程,监视连接是否正常,如果被关闭,则重新连接。
public class GCMService { /** broker客户端名 */ private static final String CN_USER_MANAGER = "USER_MANAGER"; /** 互斥量 */ private Object lock = new Object(); /** 客户端列表 */ private List<BrokerClient> brokerClients; ...... /** * 增加broker监听客户端 */ private void addBrokerClient() { logger.info("# addBrokerClient() start"); ...... // 连接异常处理 userManagerBrokerClient.setExceptionListener(new ExceptionListener() { @Override public void onException(JMSException ex) { try { synchronized (lock) { // 同步锁 for (BrokerClient client : brokerClients) { if (StringUtils.equals(client.getName(), CN_USER_MANAGER)) { // 关闭客户端 client.stop(); } } } } catch (JMSException e) { e.printStackTrace(); } } }); brokerClients.add(userManagerBrokerClient); ...... logger.info("# addBrokerClient() end"); } /** * 启动GCM服务 */ public void start() { ...... // 创建一个线程,监听连接是否正常 new Thread(new Runnable() { @Override public void run() { while (true) { // 连接检查间隔时间 try { Thread.sleep(config.getConnectionCheckInterval()); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock) { // 同步锁 for (BrokerClient client : brokerClients) { if (client.isStop()) { // 如果连接被关闭,则重新建立连接 logger.info(MessageFormat.format("The {0} client was stoped.", new Object[] { client.getName() })); boolean isSucceed = false; while (!isSucceed) { try { logger.info("Six seconds later, try to reconnect."); // 间隔6秒 Thread.sleep(6000); // 开始 client.start(); isSucceed = true; logger.info("Succeed to reconnect."); } catch (Exception ex) { logger.error("Failed to reconnect.", ex); } } } } } } }}, "connection_check").start(); logger.info("Gcm service starts successfully."); } catch (Exception e) { logger.error("Gcm service failed to start.", e); } logger.info("# start() end"); } /** * 服务主入口。 * * @param args */ public static void main(String[] args) { GCMService gcm = GCMService.getInstance(); gcm.start(); } }