HornetQ HA功能分析
测试方式:
使用我们之前使用的脚本,在hornetq做failover的环境下,施加很大的压力(50个线程),看failover能否成功(看有没有丢数据,主-副机能不能正常的切换过来)
具体的操作方式是:
Hornetq自带的example有HA这块的测试脚本
/hornetq-2.1.2.Final/examples/jms/non-transaction-failover
/hornetq-2.1.2.Final/examples/jms/transaction-failover
执行[bes@test157 transaction-failover]$./build.sh
带broker起来后启动压力测试脚本
需要修改的配置
hornetq-jms.xml
增加对应的destination(为我们压力脚本设置的目的地)
<topic name="topic1"> <entry name="/my/Topic1"/> </topic>
hornetq-configuration.xml
在<configuration>节点下增加
<security-enabled>false</security-enabled>
hornetq-beans.xml和client-jndi.properties中对应的localhost替换为本机的ip
增加两个对应的topic的测试类,在原有的用例中没有topic的测试用例
日志数据的比较:
Hornetq日志数据的比较调用堆栈:
ReplicationCompareDataMessage
-decode:PacketDecoder
--bufferReceived:RemotingConnectionImpl
----bufferReceived:DelegatingBufferHandler
--------bufferReceived:DelegatingBufferHandler
日志比较的关键代码org.hornetq.core.replication.impl.ReplicationManagerImpl
public void compareJournalInformation(
final JournalLoadInformation[] journalInformation) throws
HornetQException
{
if (journalLoadInformation == null ||
journalLoadInformation.length != journalInformation.length)
{
throw new HornetQException(
HornetQException.INTERNAL_ERROR,
"Live Node contains more journals than the backup node. Probably a version match error");
}
for (int i = 0; i < journalInformation.length; i++)
{
if (!journalInformation[i].equals
(journalLoadInformation[i])) {
ReplicationEndpointImpl.log
.warn("Journal comparission mismatch:\n" +
journalParametersToString(journalInformation));
}
}
}
这里面重写了org.hornetq.core.journal.JournalLoadInformation的equals方法,在看看它的equals方法
JournalLoadInformation other = (JournalLoadInformation) obj; if(maxID != other.maxID){ return false; } if(numberOfRecords != other.numberOfRecords){ return false; } return true;
从上面我们可以看出,它比较的是maxID以及numberOfRecords这两个值。我们在看看其中的一个赋值的地方:
public void decodeRest(final HornetQBuffer buffer) { int numberOfJournals = buffer.readInt(); journalInformation = new JournalLoadInformation[numberOfJournals]; for (int i = 0; i < numberOfJournals; i++) { journalInformation[i] = new JournalLoadInformation(); journalInformation[i].setNumberOfRecords(buffer.readInt()); journalInformation[i].setMaxID(buffer.readLong()); } }
就目前的调查来看,在启动HornetQ和创建session会话的时候会调用到日志比较。下图是比较日志调用的路线
compareJournals(ReplicationManagerImpl)
compareJournals(HornetQServerImpl)
initialisePart2(HornetQServerImpl)
checkActivate(HornetQServerImpl)
handleCreateSession(HornetQPacketHandler)
handleReattachSession(HornetQPacketHandler)
start(HornetQServerImpl)
日志复制通道
RepliactionManagerImpl#start方法用于获取一个与备原机器的连接,创建用于日志复制的会话 :
start(ReplicationManagerImpl)
activated(JMSServerMangerImpl)
createJournal(JMSServerMangerImpl)
initJournal(JMSServerMangerImpl)
activated(JMSServerMangerImpl)
callActivatedCallbacks(HornetQServerImpl)
initialisePart2(HornetQServerImpl)
checkActivate(HornetQServerImpl)
start(HornetQServerImpl)
关键代码org.hornetq.core.replication.impl.ReplicationManagerImpl#start
public synchronized void start() throws Exception { // 获取和备原机器的连接 replicatingConnection = failoverManager.getConnection(); long channelID = replicatingConnection.generateChannelID(); Channel mainChannel = replicatingConnection.getChannel(1, -1); replicatingChannel = replicatingConnection.getChannel(channelID, -1); replicatingChannel.setHandler(responseHandler); CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage( channelID); // 发送一个创建拷贝会话的命令(PacketImpl.CREATE_REPLICATION) mainChannel.sendBlocking(replicationStartPackage); }
上面发出的消息HorentQPacketHanler#handlePacket(final Packet packet)会处理
case CREATE_REPLICATION: { //Create queue can also be fielded here in the case of a replicated store and forward queue creation CreateRelicationSessionMessage request = (CreateRelicationSessionMessage)packet; handleCreateReplication(request); break; }
日志的同步
调用ReplicationManagerImpl#sendReplicatePacket来复制日志;这个方法的调用者很多,如消息的发送,结束发送,创建连接工厂,物理目的地等操作
sendReplicatePacket(ReplicationManagerImpl)
appendAddRecord(ReplicatedJournal)
appendAddRecord(ReplicatedJournal)
storeMessage(JournalStroageManager)
processRoute(PostOfficeImpl)
redistribute(PostOfficeImpl)
route(PostOfficeImpl)
routeQueueInfo(PostOfficeImpl)
关键代码org.hornetq.core.replication.impl.ReplicationManagerImpl#sendReplicationPacket
private void sendReplicatePacket(final Pakcet packet)
{
boolean runItNow = false;
OperationContext repliToken = OperationContextImpl
.getContext(executorFactory);
repliToken.replicationLineUp();
synchronized (replicationLock)
{
if (!enabled)
{
runItNow = true;
}
else
{
pendingTokens.add(repliToken);
replicatingChannel.send(packet);
}
}
if (runItNow)
{
repliToken.relicationDone();
}
}
收到一条消息就发到replicatingChannel,做到了日志同步。
replicatingChannel的建立
org.hornetq.core.replication.impl.ReplicationManagerImpl#start
public synchronized void start() throws Exception { replicatingConnection = failoverManager.getConnection(); long channelID = replicatingConnection.generateChannelID(); //在这里为replicatingChannel赋值 replicatingChannel = replicatingConnection.getChannel(channelID, -1); replicatingChanne.setHandler(responseHandler); }
主备机器切换
HornetQ的失效备原是在客户端层面来做的 ,之前会注册一个用于失效备原的监听器,当监听到异常时,就会尝试进行失效备原;下面是在发生失效备原时的调用堆栈:
FailoverManagerImpl#failoverOrReconnect
FailoverManagerImpl#handleConnectionFailure
FailoverManagerImpl$DelegatingFailureListener#connectionFailed
RemotingConnectionImpl#callFailureListeners
RemotingConnectionImpl#fail
FailoverManagerImpl$ChannelOHandler
Failover关键代码org.hornetq.core.client.impl.FailoverManagerImpl#failoverOrReconnect
上一篇: forge alpha4
下一篇: resteasy实现文件上传