ActiveMQ Broker发送消息给消费者过程详解 博客分类: ActiveMQ activemq
程序员文章站
2024-03-19 20:31:40
...
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
引言:
从activemq脚本,可以看出启动ActiveMQ实际是启动,bin文件夹下的其实activemq.jar
包中有一个类为Main,这就是active的启动入口,Main主要是加载lib目录和ClassPath,初始化
类加载器,委托给ShellCommand,由ShellCommand根据命令描述去执行,如果是Version和HELP,
则打印信息,若是启动命令,则通过XBeanBrokerFactory创建BrokerService,这个过程主要利用的Spring的bean容器机制,然后启动BrokerService,主要启动持久化适配器,JMX连接,上下文关系器,最后启动所有网络连接,及TcpTransport连接TransportConnector,默认使用的是openwire:tcp,所以我们就看一下TcpTransportServer,TcpTransportServer有TcpTransportFactory创建并配置OpenWire协议转换器,启动TcpTransportServer,就是从ServerSocketFactory获取ServerSocket,并绑定ip和port,监听连接
,并设置ServerSocket的监听器org.apache.activemq.transport.nio.SelectorManager.Listener,这个用的是java nio。
前一篇文章中,说过ActiveMQ启动过程,今天看一下TcpTransportServer与ActiveMQConnection如何交互,如何将消息发送给消费者,从TransportConnector启动开始。
//TransportConnector
//启动TCP监听
我们再来看TcpTransportServer的启动
getServer().start();
这个启动有两层含义一是启动TcpTransportServer线程,而是启动Service,
先来看第一层含义
//TcpTransportServer
TcpTransportServer父类栈
再来看Service层的启动
而TcpTransport也是service,来看他的启动,也有两层含义,第一启动TcpTransport线程,而启动Service,
先看第一层
//初始化Socket,及数据输入输出流,已经过,这里不再将
再看TcpTransport线程的启动
读取命令
//OpenWireFormat
//DataByteArrayInputStream
//OpenWireFormat
解析输入流,转为为command
//WireFormatInfoMarshaller
//WireFormatInfo
//BaseDataStreamMarshaller
}
回到TcpTransport处理命令
处理命令
再回到看ActiveMQConnection实现transportListener
//启动所有连接注册的监听器
在ActiveMQConnection构造中有这么一段
实际上调用的是ActiveMQConnection的onCommand
ResponseCorrelator
回到ActiveMQConnection的onCommand
//ActiveMQSession
//ActiveMQSessionExecutor
我们来看同步,获取会话消费者,遍历消费,消费消息
上面分发消息这一段,我们在前几篇有说过,这一就不在讲。
总结:
TransportConnector的启动,主要为添加TcpTransportServer的监听器TransportAcceptListener,监听器主要任务是接受连接,并启动连接TransportConnector,在启动的过程启动一个TcpTransportServer,并启动TcpTransportServer,TcpTransportServer监听连接请求,如果有连接请求,则创建连接,同时启动连接的Transport,Transport启动过程主要是读取命令,然后交由TransportListener处理,实际为ActiveMQConnection,如果命令为消息分发命令则有ActiveMQConnectionConsumer根据分化消息获取消费者信息,并从ActiveMQConnectionConsumer获取连接会话,然后由会话来分发消息,最后交由会话执行器分发消息。
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
引言:
从activemq脚本,可以看出启动ActiveMQ实际是启动,bin文件夹下的其实activemq.jar
包中有一个类为Main,这就是active的启动入口,Main主要是加载lib目录和ClassPath,初始化
类加载器,委托给ShellCommand,由ShellCommand根据命令描述去执行,如果是Version和HELP,
则打印信息,若是启动命令,则通过XBeanBrokerFactory创建BrokerService,这个过程主要利用的Spring的bean容器机制,然后启动BrokerService,主要启动持久化适配器,JMX连接,上下文关系器,最后启动所有网络连接,及TcpTransport连接TransportConnector,默认使用的是openwire:tcp,所以我们就看一下TcpTransportServer,TcpTransportServer有TcpTransportFactory创建并配置OpenWire协议转换器,启动TcpTransportServer,就是从ServerSocketFactory获取ServerSocket,并绑定ip和port,监听连接
,并设置ServerSocket的监听器org.apache.activemq.transport.nio.SelectorManager.Listener,这个用的是java nio。
前一篇文章中,说过ActiveMQ启动过程,今天看一下TcpTransportServer与ActiveMQConnection如何交互,如何将消息发送给消费者,从TransportConnector启动开始。
//TransportConnector
//启动TCP监听
public void start() throws Exception { broker = brokerService.getBroker(); brokerInfo.setBrokerName(broker.getBrokerName()); brokerInfo.setBrokerId(broker.getBrokerId()); brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString()); //添加TcpTransportServer的监听器 getServer().setAcceptListener(new TransportAcceptListener() { public void onAccept(final Transport transport) { try { brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { try { if(!brokerService.isStopping()) { //创建TransportConnector,并启动 Connection connection = createConnection(transport); connection.start(); } } } final Transport val$transport; final _cls1 this$1; { this$1 = _cls1.this; transport = transport1; super(); } }); } } final TransportConnector this$0; { this$0 = TransportConnector.this; super(); } }); //启动TcpTransportServer getServer().setBrokerInfo(brokerInfo); getServer().start(); DiscoveryAgent da = getDiscoveryAgent(); if(da != null) { da.registerService(getPublishableConnectString()); da.start(); } if(enableStatusMonitor) { statusDector = new TransportStatusDetector(this); statusDector.start(); } } //创建TransportConnector连接 protected Connection createConnection(Transport transport) throws IOException { TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null : taskRunnerFactory, brokerService.getTaskRunnerFactory()); boolean statEnabled = getStatistics().isEnabled(); answer.getStatistics().setEnabled(statEnabled); answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); return answer; }
我们再来看TcpTransportServer的启动
getServer().start();
这个启动有两层含义一是启动TcpTransportServer线程,而是启动Service,
先来看第一层含义
//TcpTransportServer
public void run() { final ServerSocketChannel chan = serverSocket.getChannel(); if(chan != null) try { //如果socket通道存在,则设置通道选择器 chan.configureBlocking(false); selector = SelectorManager.getInstance().register(chan, new org.apache.activemq.transport.nio.SelectorManager.Listener() { public void onSelect(SelectorSelection sel) { try { SocketChannel sc = chan.accept(); if(sc != null) if(isStopped() || getAcceptListener() == null) sc.close(); else if(useQueueForAccept) socketQueue.put(sc.socket()); else handleSocket(sc.socket()); } } final ServerSocketChannel val$chan; final TcpTransportServer this$0; { this$0 = TcpTransportServer.this; chan = serversocketchannel; super(); } }); selector.setInterestOps(16); selector.enable(); } else do { //如果socket通道不存在,则serverSocket接受连接,并处理Socket连接 if(isStopped()) break; Socket socket = null; try { socket = serverSocket.accept(); if(socket != null) if(isStopped() || getAcceptListener() == null) socket.close(); else if(useQueueForAccept) socketQueue.put(socket); else handleSocket(socket); } } while(true); } protected final void handleSocket(Socket socket) { boolean closeSocket = true; try { if(currentTransportCount.get() >= maximumConnections) throw new ExceededMaximumConnectionsException("Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml)"); HashMap options = new HashMap(); options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); options.put("trace", Boolean.valueOf(trace)); options.put("soTimeout", Integer.valueOf(soTimeout)); options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); options.put("logWriterName", logWriterName); options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); options.put("startLogging", Boolean.valueOf(startLogging)); options.putAll(transportOptions); WireFormat format = wireFormatFactory.createWireFormat(); //创建transport Transport transport = createTransport(socket, format); closeSocket = false; //将transport添加到ServiceSupport监听器列表中 if(transport instanceof ServiceSupport) ((ServiceSupport)transport).addServiceListener(this); Transport configuredTransport = transportFactory.serverConfigure(transport, format, options); //TcpTransportServer监听器接受连接transport, getAcceptListener().onAccept(configuredTransport); currentTransportCount.incrementAndGet(); } } protected Transport createTransport(Socket socket, WireFormat format) throws IOException { return new TcpTransport(format, socket); }
TcpTransportServer父类栈
public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener public abstract class TransportServerThreadSupport extends TransportServerSupport implements Runnable public abstract class TransportServerSupport extends ServiceSupport implements TransportServer
再来看Service层的启动
public abstract class ServiceSupport implements Service { private AtomicBoolean started; private AtomicBoolean stopping; private AtomicBoolean stopped; private List serviceListeners;//service监听器 public void start() throws Exception { boolean success; if(!started.compareAndSet(false, true)) break MISSING_BLOCK_LABEL_93; success = false; stopped.set(false); preStart(); //doStart为抽象函数,待父类扩展 doStart(); success = true; started.set(success); break MISSING_BLOCK_LABEL_54; Exception exception; exception; started.set(success); throw exception; ServiceListener l; //启动所有Service监听,在TcpServe处理Socket的连接中(handleSocket), //将transport添加到ServiceSupport监听器列表中 for(Iterator i$ = serviceListeners.iterator(); i$.hasNext(); l.started(this)) l = (ServiceListener)i$.next(); } }
而TcpTransport也是service,来看他的启动,也有两层含义,第一启动TcpTransport线程,而启动Service,
先看第一层
//初始化Socket,及数据输入输出流,已经过,这里不再将
protected void doStart() throws Exception { //连接 connect(); stoppedLatch.set(new CountDownLatch(1)); super.doStart(); } protected void connect() throws Exception { InetSocketAddress localAddress = null; InetSocketAddress remoteAddress = null; if(localLocation != null) localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); if(remoteLocation != null) { String host = resolveHostName(remoteLocation.getHost()); remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); } trafficClassSet = setTrafficClass(socket); if(socket != null) { if(localAddress != null) socket.bind(localAddress); if(remoteAddress != null) if(connectionTimeout >= 0) socket.connect(remoteAddress, connectionTimeout); else socket.connect(remoteAddress); } else if(localAddress != null) socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort()); else socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); initialiseSocket(socket); initializeStreams(); }
再看TcpTransport线程的启动
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { public void run() { LOG.trace((new StringBuilder()).append("TCP consumer thread for ").append(this).append(" starting").toString()); runnerThread = Thread.currentThread(); //如果TcpTransport for(; !isStopped(); doRun()); ((CountDownLatch)stoppedLatch.get()).countDown(); } } protected void doRun() throws IOException { try { //读取命令 Object command = readCommand(); //处理命令 doConsume(command); } }
读取命令
Object command = readCommand(); protected Object readCommand() throws IOException { //通过wireFormat解析字节流 return wireFormat.unmarshal(dataIn); }
//OpenWireFormat
public final class OpenWireFormat implements WireFormat { private DataStreamMarshaller dataMarshallers[]; private int version; private boolean stackTraceEnabled; private boolean tcpNoDelayEnabled; private boolean cacheEnabled; private boolean tightEncodingEnabled; private boolean sizePrefixDisabled; private long maxFrameSize; private short nextMarshallCacheIndex; private short nextMarshallCacheEvictionIndex; private Map marshallCacheMap; private DataStructure marshallCache[];//命令字节流发送缓存 private DataStructure unmarshallCache[];//命令字节流解析缓存 private DataByteArrayOutputStream bytesOut;//数据输入流 private DataByteArrayInputStream bytesIn;//数据输出流 private WireFormatInfo preferedWireFormatInfo;//协议格式信息 public synchronized Object unmarshal(ByteSequence sequence) throws IOException { //从二进制字节流读取数据到缓存,记录读取位置 bytesIn.restart(sequence); if(!sizePrefixDisabled) { int size = bytesIn.readInt(); if(sequence.getLength() - 4 == size); if((long)size > maxFrameSize) throw new IOException((new StringBuilder()).append("Frame size of ").append(size / 1048576).append(" MB larger than max allowed ").append(maxFrameSize / 1048576L).append(" MB").toString()); } //解析输入流,转为为command Object command = doUnmarshal(bytesIn); return command; } }
//DataByteArrayInputStream
public final class DataByteArrayInputStream extends InputStream implements DataInput { private byte buf[]; private int pos; private int offset; //从二进制字节流读取数据到缓存,记录读取位置 public void restart(ByteSequence sequence) { buf = sequence.getData(); pos = sequence.getOffset(); } }
//OpenWireFormat
解析输入流,转为为command
public Object doUnmarshal(DataInput dis) throws IOException { //获取命令类型 byte dataType = dis.readByte(); if(dataType != 0) { //创建命令字节流对应大小的字节流处理器,DataStreamMarshaller为WireFormatInfoMarshaller DataStreamMarshaller dsm = dataMarshallers[dataType & 255]; if(dsm == null) throw new IOException((new StringBuilder()).append("Unknown data type: ").append(dataType).toString()); //创建命令对应的数据结构 Object data = dsm.createObject(); if(tightEncodingEnabled) { BooleanStream bs = new BooleanStream(); bs.unmarshal(dis); dsm.tightUnmarshal(this, data, dis, bs); } else { //解析命令字节流 dsm.looseUnmarshal(this, data, dis); } return data; } else { return null; } }
//WireFormatInfoMarshaller
public class WireFormatInfoMarshaller extends BaseDataStreamMarshaller { //创建命令对应的数据结构 public DataStructure createObject() { return new WireFormatInfo(); } //设置WireFormat格式下,命令命令对应的魔数,版本信息 public void looseUnmarshal(OpenWireFormat wireFormat, Object o, DataInput dataIn) throws IOException { //解析命令字节流,委托给BaseDataStreamMarshaller super.looseUnmarshal(wireFormat, o, dataIn); WireFormatInfo info = (WireFormatInfo)o; //如果命令字节流属性不为null,则初始化命令字节流 info.beforeUnmarshall(wireFormat); info.setMagic(looseUnmarshalConstByteArray(dataIn, 8)); info.setVersion(dataIn.readInt()); //设置命令字节流属性 info.setMarshalledProperties(looseUnmarshalByteSequence(dataIn)); info.afterUnmarshall(wireFormat); } }
//WireFormatInfo
public class WireFormatInfo implements Command, MarshallAware { public static final byte DATA_STRUCTURE_TYPE = 1; private static final int MAX_PROPERTY_SIZE = 4096; private static final byte MAGIC[] = { 65, 99, 116, 105, 118, 101, 77, 81 }; protected byte magic[]; protected int version; protected ByteSequence marshalledProperties; protected transient Map properties; private transient Endpoint from; private transient Endpoint to; public void beforeMarshall(WireFormat wireFormat) throws IOException { if(marshalledProperties == null && properties != null) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream os = new DataOutputStream(baos); MarshallingSupport.marshalPrimitiveMap(properties, os); os.close(); marshalledProperties = baos.toByteSequence(); } } //设置命令字节流属性 public void setMarshalledProperties(ByteSequence marshalledProperties) { this.marshalledProperties = marshalledProperties; } }
//BaseDataStreamMarshaller
public abstract class BaseDataStreamMarshaller implements DataStreamMarshaller { public static final Constructor STACK_TRACE_ELEMENT_CONSTRUCTOR; static { Constructor constructor = null; try { constructor = java/lang/StackTraceElement.getConstructor(new Class[] { java/lang/String, java/lang/String, java/lang/String, Integer.TYPE }); } catch(Throwable throwable) { } STACK_TRACE_ELEMENT_CONSTRUCTOR = constructor; } //待扩展 public void looseUnmarshal(OpenWireFormat openwireformat, Object obj, DataInput datainput) throws IOException { } }
}
回到TcpTransport处理命令
doConsume(command);
处理命令
public void doConsume(Object command) { if(command != null) if(transportListener != null) //如果transport监听器不为空,则处理命令 transportListener.onCommand(command); else LOG.error((new StringBuilder()).append("No transportListener available to process inbound command: ").append(command).toString()); } public abstract class TransportSupport extends ServiceSupport implements Transport { TransportListener transportListener;//transport监听器 public void doConsume(Object command) { if(command != null) if(transportListener != null) transportListener.onCommand(command); else LOG.error((new StringBuilder()).append("No transportListener available to process inbound command: ").append(command).toString()); } }
再回到看ActiveMQConnection实现transportListener
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection { public void onCommand(Object o) { final Command command = (Command)o; if(!closed.get() && command != null) try { command.visit(new CommandVisitorAdapter() { //分发消息 public Response processMessageDispatch(MessageDispatch md) throws Exception { waitForTransportInterruptionProcessingToComplete(); //根据分发消息id,获取消费者,然后消费者,消费消息 ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)dispatchers.get(md.getConsumerId()); if(dispatcher != null) { Message msg = md.getMessage(); if(msg != null) { msg = msg.copy(); msg.setReadOnlyBody(true); msg.setReadOnlyProperties(true); msg.setRedeliveryCounter(md.getRedeliveryCounter()); msg.setConnection(ActiveMQConnection.this); msg.setMemoryUsage(null); md.setMessage(msg); } //分发消息 dispatcher.dispatch(md); } //处理生产者恢复消息 public Response processProducerAck(ProducerAck pa) throws Exception { if(pa != null && pa.getProducerId() != null) { ActiveMQMessageProducer producer = (ActiveMQMessageProducer)producers.get(pa.getProducerId()); if(producer != null) producer.onProducerAck(pa); } return null; } //处理broker public Response processBrokerInfo(BrokerInfo info) throws Exception { brokerInfo = info; brokerInfoReceived.countDown(); optimizeAcknowledge = brokerInfo.isFaultTolerantConfiguration() ? 0 : 1; getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); return null; } //处理连接错误 public Response processConnectionError(final ConnectionError error) throws Exception { executor.execute(new Runnable() { public void run() { onAsyncException(error.getException()); } final ConnectionError val$error; final _cls3 this$1; { this$1 = _cls3.this; error = connectionerror; super(); } }); return null; } //处理控制命令 public Response processControlCommand(ControlCommand command) throws Exception { onControlCommand(command); return null; } //处理连接命令 public Response processConnectionControl(ConnectionControl control) throws Exception { onConnectionControl((ConnectionControl)command); return null; } //处理消费控制命令 public Response processConsumerControl(ConsumerControl control) throws Exception { onConsumerControl((ConsumerControl)command); return null; } public Response processWireFormat(WireFormatInfo info) throws Exception { onWireFormatInfo((WireFormatInfo)command); return null; } final Command val$command; final ActiveMQConnection this$0; { this$0 = ActiveMQConnection.this; command = command1; super(); } }); } //启动所有连接注册的监听器 TransportListener listener; for(Iterator iter = transportListeners.iterator(); iter.hasNext(); listener.onCommand(command)) listener = (TransportListener)iter.next(); } 处理消费控制命令 protected void onConsumerControl(ConsumerControl command) { if(command.isClose()) { ActiveMQSession session; for(Iterator i$ = sessions.iterator(); i$.hasNext(); session.close(command.getConsumerId())) session = (ActiveMQSession)i$.next(); } else { Iterator i$; ActiveMQSession session; //设置会话消费者抓取数据大小 for(i$ = sessions.iterator(); i$.hasNext(); session.setPrefetchSize(command.getConsumerId(), command.getPrefetch())) session = (ActiveMQSession)i$.next(); i$ = connectionConsumers.iterator(); do { if(!i$.hasNext()) break; ActiveMQConnectionConsumer connectionConsumer = (ActiveMQConnectionConsumer)i$.next(); ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); if(consumerInfo.getConsumerId().equals(command.getConsumerId())) //设置消费抓取数据大小 consumerInfo.setPrefetchSize(command.getPrefetch()); } while(true); } }
//启动所有连接注册的监听器
TransportListener listener; for(Iterator iter = transportListeners.iterator(); iter.hasNext(); listener.onCommand(command)) listener = (TransportListener)iter.next();
在ActiveMQConnection构造中有这么一段
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { //transport的监听器为ActiveMQConnection this.transport.setTransportListener(this); stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); this.factoryStats.addConnection(this); connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); }
实际上调用的是ActiveMQConnection的onCommand
ResponseCorrelator
public void onCommand(Object o) { Command command = null; if(o instanceof Command) command = (Command)o; else throw new ClassCastException((new StringBuilder()).append("Object cannot be converted to a Command, Object: ").append(o).toString()); if(command.isResponse()) { Response response = (Response)command; FutureResponse future = null; synchronized(requestMap) { future = (FutureResponse)requestMap.remove(Integer.valueOf(response.getCorrelationId())); } if(future != null) future.set(response); else if(debug) LOG.debug((new StringBuilder()).append("Received unexpected response: {").append(command).append("}for command id: ").append(response.getCorrelationId()).toString()); } else { getTransportListener().onCommand(command); } }
回到ActiveMQConnection的onCommand
//分发消息 public Response processMessageDispatch(MessageDispatch md) throws Exception { waitForTransportInterruptionProcessingToComplete(); //根据分发消息id,获取消费者,然后消费者,消费消息 ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)dispatchers.get(md.getConsumerId()); if(dispatcher != null) { Message msg = md.getMessage(); if(msg != null) { msg = msg.copy(); msg.setReadOnlyBody(true); msg.setReadOnlyProperties(true); msg.setRedeliveryCounter(md.getRedeliveryCounter()); //设置分发消息连接 msg.setConnection(ActiveMQConnection.this); msg.setMemoryUsage(null); md.setMessage(msg); } //分发消息 dispatcher.dispatch(md); }
public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher { private ActiveMQConnection connection;//连接 private ServerSessionPool sessionPool;//会话池 private ConsumerInfo consumerInfo;//消费者信息 private boolean closed; public void dispatch(MessageDispatch messageDispatch) { ServerSession serverSession; ActiveMQSession session; messageDispatch.setConsumer(this); //获取护花 serverSession = sessionPool.getServerSession(); Session s = serverSession.getSession(); session = null; if(s instanceof ActiveMQSession) session = (ActiveMQSession)s; else if(s instanceof ActiveMQTopicSession) { ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s; session = (ActiveMQSession)topicSession.getNext(); } else if(s instanceof ActiveMQQueueSession) { ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s; session = (ActiveMQSession)queueSession.getNext(); } else { connection.onClientInternalException(new JMSException((new StringBuilder()).append("Session pool provided an invalid session type: ").append(s.getClass()).toString())); return; } try { //会话分发消息,这个前面已说过,就是 session.dispatch(messageDispatch); serverSession.start(); } catch(JMSException e) { connection.onAsyncException(e); } return; } }
//ActiveMQSession
public void dispatch(MessageDispatch messageDispatch) { try { //会话执行,执行消息分发 executor.execute(messageDispatch); } }
//ActiveMQSessionExecutor
public class ActiveMQSessionExecutor implements Task { private final ActiveMQSession session; private final MessageDispatchChannel messageQueue;//未消费消息队列 private boolean dispatchedBySessionPool; private volatile TaskRunner taskRunner; private boolean startedOrWarnedThatNotStarted; void execute(MessageDispatch message) throws InterruptedException { if(!startedOrWarnedThatNotStarted) { ActiveMQConnection connection = session.connection; long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout(); if(connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) { startedOrWarnedThatNotStarted = true; } else { long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated(); if(elapsedTime > aboutUnstartedConnectionTimeout) { LOG.warn((new StringBuilder()).append("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: ").append(connection).append(" Received: ").append(message).toString()); startedOrWarnedThatNotStarted = true; } } } if(!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) { //如果不是异步分发消息,则直接分发消息 dispatch(message); } else { //将分发消息添加到未分发消息队列 messageQueue.enqueue(message); wakeup(); } } }
我们来看同步,获取会话消费者,遍历消费,消费消息
void dispatch(MessageDispatch message) { Iterator i$ = session.consumers.iterator(); do { if(!i$.hasNext()) break; ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i$.next(); ConsumerId consumerId = message.getConsumerId(); if(!consumerId.equals(consumer.getConsumerId())) continue; consumer.dispatch(message); break; } while(true); }
上面分发消息这一段,我们在前几篇有说过,这一就不在讲。
总结:
TransportConnector的启动,主要为添加TcpTransportServer的监听器TransportAcceptListener,监听器主要任务是接受连接,并启动连接TransportConnector,在启动的过程启动一个TcpTransportServer,并启动TcpTransportServer,TcpTransportServer监听连接请求,如果有连接请求,则创建连接,同时启动连接的Transport,Transport启动过程主要是读取命令,然后交由TransportListener处理,实际为ActiveMQConnection,如果命令为消息分发命令则有ActiveMQConnectionConsumer根据分化消息获取消费者信息,并从ActiveMQConnectionConsumer获取连接会话,然后由会话来分发消息,最后交由会话执行器分发消息。