欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

AMQ中池化连接工厂(PooledConnectionFactory)的用法和机制 ActiveMQconnectionPool 

程序员文章站 2022-07-03 13:23:46
...
>>代码示例如下:

import javax.jms.JMSException;
import javax.jms.MessageConsumer;

import com.cnd.mq.AMQSession;
import com.cnd.mq.command.AMQQueue;
import com.cnd.mq.jms.pool.PooledSession;
import com.cnd.mq.pool.PooledConnection;
import com.cnd.mq.pool.PooledConnectionFactory;

public class PooledConnectionFactoryTest {

public PooledConnectionFactory getPooledConnectionFactory(String paramUrl) {
  PooledConnectionFactory pcf = new PooledConnectionFactory(paramUrl);
  pcf.setMaxConnections(3);
  pcf.setMaximumActiveSessionPerConnection(5);
  pcf.setIdleTimeout(30);
  return pcf;

}

public static void main(String[] args) {

  PooledConnectionFactoryTest test = new PooledConnectionFactoryTest();
  String url = "tcp://cnd:61616";
  String queueName = "queue_cnd";
  // 此处以后可以通过单例的方法来调用
  PooledConnectionFactory pcf = test.getPooledConnectionFactory(url);
  for (int i = 0; i < 20; i++) {
   try {
    // 从连接池中获取PooledConnection和PooledSession的方式
    // 1.直接调用池化连接工厂中的创建连接方法==>获取到的就是池化的连接
    PooledConnection connection = (PooledConnection) pcf
      .createConnection();
    // 2.从池化连接中获取到才是池化的会话;
    PooledSession session = (PooledSession) connection
      .createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
    // 3.通过池化的会话创建目标和消费者
    AMQQueue destination = (AMQQueue) session
      .createQueue(queueName);
    MessageConsumer consumer = session.createConsumer(destination);
    boolean isListenMode = true;
    if (isListenMode) {
     consumer.receiveNoWait();
     System.out.println("The " + (i + 1) + " is "
       + Thread.currentThread().getName());
     // 诠释:通过sleep后(慢放效果)可以看到,从池化的连接工厂中获取连接时,
     // 池化连接工厂提供连接的机制是,当来一个请求要创建会话时,先看是否连接已经最大,如果不是,则建一个连接并创建一个会话,
     // 否则,开始复用已经开到最大值的连接,顺序是从前面的连接开始(即:先把连接开到最大值(建立tcp),然后再开始按顺序复用连接)
     // 因为每个连接可以设置会话的大小,此测试设置Connection最大值为3,每个Connection可开
     // 会话数的最大值为5,最终,可以同时支持3*5个线程来创建消费者,即创建消费者的数量是15。
     // conn1,sess1=>conn2,sess2=>conn3,sess3=>conn1,sess4=>conn2,sess5=>conn3,sess6,......
     try {
      Thread.sleep(1500);
     } catch (InterruptedException ie) {
      ie.printStackTrace();
     }
     session.close();
    } else {
     // 如果是onMessage方式,会话是一直被占用着的,没有条件回收;
     consumer.getMessageListener();
     System.out.println("The " + (i + 1) + " is "
       + Thread.currentThread().getName());
     try {
      Thread.sleep(1500);
     } catch (InterruptedException ie) {
      ie.printStackTrace();
     }
    }
   } catch (JMSException jmse) {
    jmse.printStackTrace();
   }
  }

}

}

>>诠释如下:

>>连接池回收机制:

>>从连接池取连接时对连接上的引用数和最近使用时间的更新处理:

>>维护连接池的专属线程方法:

org.apache.commons.pool.impl.GenericKeyedObjectPool.evict()==>
以下方法:
1.activateObject()
2.validateObject()
3.destroyObject()
===>
在激活当前连接时,会判断当前连接是否为要删除的,通过方法validateObject()先判断当前是否无效,如果无效,则标记当前连接为要删除的;
如果当前连接为要删除的,则调用销毁方法destroyObject。

【温馨提示】
如果您觉得满意,可以选择支持下,您的支持是我最大的动力:

AMQ中池化连接工厂(PooledConnectionFactory)的用法和机制
            
    
    
        ActiveMQconnectionPool