AMQ中池化连接工厂(PooledConnectionFactory)的用法和机制 ActiveMQconnectionPool
程序员文章站
2022-07-03 13:23:46
...
>>代码示例如下:
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import com.cnd.mq.AMQSession;
import com.cnd.mq.command.AMQQueue;
import com.cnd.mq.jms.pool.PooledSession;
import com.cnd.mq.pool.PooledConnection;
import com.cnd.mq.pool.PooledConnectionFactory;
public class PooledConnectionFactoryTest {
public PooledConnectionFactory getPooledConnectionFactory(String paramUrl) {
PooledConnectionFactory pcf = new PooledConnectionFactory(paramUrl);
pcf.setMaxConnections(3);
pcf.setMaximumActiveSessionPerConnection(5);
pcf.setIdleTimeout(30);
return pcf;
}
public static void main(String[] args) {
PooledConnectionFactoryTest test = new PooledConnectionFactoryTest();
String url = "tcp://cnd:61616";
String queueName = "queue_cnd";
// 此处以后可以通过单例的方法来调用
PooledConnectionFactory pcf = test.getPooledConnectionFactory(url);
for (int i = 0; i < 20; i++) {
try {
// 从连接池中获取PooledConnection和PooledSession的方式
// 1.直接调用池化连接工厂中的创建连接方法==>获取到的就是池化的连接
PooledConnection connection = (PooledConnection) pcf
.createConnection();
// 2.从池化连接中获取到才是池化的会话;
PooledSession session = (PooledSession) connection
.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
// 3.通过池化的会话创建目标和消费者
AMQQueue destination = (AMQQueue) session
.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
boolean isListenMode = true;
if (isListenMode) {
consumer.receiveNoWait();
System.out.println("The " + (i + 1) + " is "
+ Thread.currentThread().getName());
// 诠释:通过sleep后(慢放效果)可以看到,从池化的连接工厂中获取连接时,
// 池化连接工厂提供连接的机制是,当来一个请求要创建会话时,先看是否连接已经最大,如果不是,则建一个连接并创建一个会话,
// 否则,开始复用已经开到最大值的连接,顺序是从前面的连接开始(即:先把连接开到最大值(建立tcp),然后再开始按顺序复用连接)
// 因为每个连接可以设置会话的大小,此测试设置Connection最大值为3,每个Connection可开
// 会话数的最大值为5,最终,可以同时支持3*5个线程来创建消费者,即创建消费者的数量是15。
// conn1,sess1=>conn2,sess2=>conn3,sess3=>conn1,sess4=>conn2,sess5=>conn3,sess6,......
try {
Thread.sleep(1500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
session.close();
} else {
// 如果是onMessage方式,会话是一直被占用着的,没有条件回收;
consumer.getMessageListener();
System.out.println("The " + (i + 1) + " is "
+ Thread.currentThread().getName());
try {
Thread.sleep(1500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
}
}
>>诠释如下:
>>连接池回收机制:
>>从连接池取连接时对连接上的引用数和最近使用时间的更新处理:
>>维护连接池的专属线程方法:
org.apache.commons.pool.impl.GenericKeyedObjectPool.evict()==>
以下方法:
1.activateObject()
2.validateObject()
3.destroyObject()
===>
在激活当前连接时,会判断当前连接是否为要删除的,通过方法validateObject()先判断当前是否无效,如果无效,则标记当前连接为要删除的;
如果当前连接为要删除的,则调用销毁方法destroyObject。
【温馨提示】
如果您觉得满意,可以选择支持下,您的支持是我最大的动力:
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import com.cnd.mq.AMQSession;
import com.cnd.mq.command.AMQQueue;
import com.cnd.mq.jms.pool.PooledSession;
import com.cnd.mq.pool.PooledConnection;
import com.cnd.mq.pool.PooledConnectionFactory;
public class PooledConnectionFactoryTest {
public PooledConnectionFactory getPooledConnectionFactory(String paramUrl) {
PooledConnectionFactory pcf = new PooledConnectionFactory(paramUrl);
pcf.setMaxConnections(3);
pcf.setMaximumActiveSessionPerConnection(5);
pcf.setIdleTimeout(30);
return pcf;
}
public static void main(String[] args) {
PooledConnectionFactoryTest test = new PooledConnectionFactoryTest();
String url = "tcp://cnd:61616";
String queueName = "queue_cnd";
// 此处以后可以通过单例的方法来调用
PooledConnectionFactory pcf = test.getPooledConnectionFactory(url);
for (int i = 0; i < 20; i++) {
try {
// 从连接池中获取PooledConnection和PooledSession的方式
// 1.直接调用池化连接工厂中的创建连接方法==>获取到的就是池化的连接
PooledConnection connection = (PooledConnection) pcf
.createConnection();
// 2.从池化连接中获取到才是池化的会话;
PooledSession session = (PooledSession) connection
.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
// 3.通过池化的会话创建目标和消费者
AMQQueue destination = (AMQQueue) session
.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
boolean isListenMode = true;
if (isListenMode) {
consumer.receiveNoWait();
System.out.println("The " + (i + 1) + " is "
+ Thread.currentThread().getName());
// 诠释:通过sleep后(慢放效果)可以看到,从池化的连接工厂中获取连接时,
// 池化连接工厂提供连接的机制是,当来一个请求要创建会话时,先看是否连接已经最大,如果不是,则建一个连接并创建一个会话,
// 否则,开始复用已经开到最大值的连接,顺序是从前面的连接开始(即:先把连接开到最大值(建立tcp),然后再开始按顺序复用连接)
// 因为每个连接可以设置会话的大小,此测试设置Connection最大值为3,每个Connection可开
// 会话数的最大值为5,最终,可以同时支持3*5个线程来创建消费者,即创建消费者的数量是15。
// conn1,sess1=>conn2,sess2=>conn3,sess3=>conn1,sess4=>conn2,sess5=>conn3,sess6,......
try {
Thread.sleep(1500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
session.close();
} else {
// 如果是onMessage方式,会话是一直被占用着的,没有条件回收;
consumer.getMessageListener();
System.out.println("The " + (i + 1) + " is "
+ Thread.currentThread().getName());
try {
Thread.sleep(1500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
}
}
>>诠释如下:
>>连接池回收机制:
>>从连接池取连接时对连接上的引用数和最近使用时间的更新处理:
>>维护连接池的专属线程方法:
org.apache.commons.pool.impl.GenericKeyedObjectPool.evict()==>
以下方法:
1.activateObject()
2.validateObject()
3.destroyObject()
===>
在激活当前连接时,会判断当前连接是否为要删除的,通过方法validateObject()先判断当前是否无效,如果无效,则标记当前连接为要删除的;
如果当前连接为要删除的,则调用销毁方法destroyObject。
【温馨提示】
如果您觉得满意,可以选择支持下,您的支持是我最大的动力:
上一篇: Resque
下一篇: 《软件框架设计的艺术》读书笔记