欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

client与broker断开后重新连接

程序员文章站 2022-07-13 17:00:48
...
qpid的client端有可能会由于某种原因与broker断开连接,如网络连接断开,broker的节点删除等等。
公司的项目需要用java编写一个Service,用来监听broker的消息。要求如果与broker断开后,必须尝试重新连接。

那应该完成以下一个步骤:
(1)检测到client与服务器断开。
(2)如果断开,尝试重新连接。

对于(1),javax.jms.Connection对象可以设置一个ExceptionListener对象,用来监听是否发生连接异常。

以下为client端的类


/**
 * 该类负责监听MQ的消息。消息的消费者为{@link GCMSender}的一个实例。
 * 
 * @author xiaofei.xu
 */
public class BrokerClient {

	/** Logger */
	private static final Logger logger = LoggerFactory.getLogger(BrokerClient.class);

	/** broker连接 */
	private AMQConnection connection;

	......

	/** 连接异常处理 */
	private ExceptionListener exceptionListener;

	/**
	 * 进行初始化
	 * 
	 * @throws AMQException
	 * @throws URLSyntaxException
	 * @throws JMSException
	 */
	private void init() throws URLSyntaxException, AMQException, JMSException {

		logger.info("# init() start");

		this.connection = new AMQConnection(this.url);
		this.connection.setExceptionListener(this.exceptionListener);

		logger.info("# init() end");
	}

	/**
	 * 设置连接异常监听
	 * 
	 * @param exceptionListener
	 *            异常监听对象
	 */
	public void setExceptionListener(final ExceptionListener exceptionListener) {
		this.exceptionListener = exceptionListener;
	}

	......

	/**
	 * 判定客户端是否关闭
	 * 
	 * @return 关闭:true,否则:false
	 */
	public boolean isStop() {
		return this.connection.isClosed();
	}

	/**
	 * 关闭服务
	 * 
	 * @throws JMSException
	 * @throws NamingException
	 */
	public void stop() throws JMSException {

		logger.info("# stop() start");

		if (connection != null) {
			connection.close();
		}

		logger.info("# stop() end");
	}
}


如果检测到异常,就关闭客户端。启动Service时启动一个线程,监视连接是否正常,如果被关闭,则重新连接。

public class GCMService {

	/** broker客户端名 */
	private static final String CN_USER_MANAGER = "USER_MANAGER";

	/** 互斥量 */
	private Object lock = new Object();

	/** 客户端列表 */
	private List<BrokerClient> brokerClients;

	......

	/**
	 * 增加broker监听客户端
	 */
	private void addBrokerClient() {
		logger.info("# addBrokerClient() start");

		......

		// 连接异常处理
		userManagerBrokerClient.setExceptionListener(new ExceptionListener() {
			@Override
			public void onException(JMSException ex) {
				try {
					synchronized (lock) { // 同步锁
						for (BrokerClient client : brokerClients) {
							if (StringUtils.equals(client.getName(), CN_USER_MANAGER)) {
								// 关闭客户端
								client.stop();
							}
						}
					}
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		brokerClients.add(userManagerBrokerClient);

		......

		logger.info("# addBrokerClient() end");
	}

	/**
	 * 启动GCM服务
	 */
	public void start() {

		......

		// 创建一个线程,监听连接是否正常
		new Thread(new Runnable() {

			@Override
			public void run() {

				while (true) {

					// 连接检查间隔时间
					try {
						Thread.sleep(config.getConnectionCheckInterval());
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					synchronized (lock) {
					// 同步锁

						for (BrokerClient client : brokerClients) {

							if (client.isStop()) {
							// 如果连接被关闭,则重新建立连接

							logger.info(MessageFormat.format("The {0} client was stoped.",
							    new Object[] { client.getName() }));

							boolean isSucceed = false;
							while (!isSucceed) {

								try {

									logger.info("Six seconds later, try to reconnect.");

									// 间隔6秒
									Thread.sleep(6000);

									// 开始
									client.start();
									isSucceed = true;

									logger.info("Succeed to reconnect.");

								} catch (Exception ex) {
									logger.error("Failed to reconnect.", ex);
								}
							}
						}
					}
				}
			}
		}}, "connection_check").start();
			logger.info("Gcm service starts successfully.");
		} catch (Exception e) {
			logger.error("Gcm service failed to start.", e);
		}

		logger.info("# start() end");
	}

	/**
	 * 服务主入口。
	 * 
	 * @param args
	 */
	public static void main(String[] args) {
		GCMService gcm = GCMService.getInstance();
		gcm.start();
	}
}
相关标签: amqp qpid broker