<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!-- Allows log searching in hawtio console -->
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="127.0.0.1"
dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- 订阅/发布-->
<policyEntry topic=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb">
<!--
消息限制策略,面向Slow Consumer的
此策略只对Topic有效,只对nondurable订阅者有效,当通道中有大量的消息积压时,broker可以保留的消息量。
为了防止Topic中有慢速消费者,导致整个通道消息积压。(对于Topic而言,一条消息只有所有的订阅者都消费才
会被删除)
-->
<pendingMessageLimitStrategy>
<!--
ConstantPendingMessageLimitStrategy: 保留固定条数的消息,如果消息量超过limit,将使用
“MessageEvictionStrategy”移除消息
PrefetchRatePendingMessageLimitStrategy: 保留prefetchSize倍数条消息。
-->
<!-- 如果prefetchSize为100,则保留10 * 100条消息 -->
<prefetchRatePendingMessageLimitStrategy multiplier="10"/>
</pendingMessageLimitStrategy>
<!--
消息剔除策略 面向Slow Consumer的
配合PendingMessageLimitStrategy,只对Topic有效,只对nondurable订阅者有效。当PendingMessage的数量超过
限制时,broker该如何剔除多余的消息。当Topic接收到信息消息后,会将消息“Copy”给每个订阅者,在保存这
个消息时(保存策略"PendingSubscriberMessageStoragePolicy"),将会检测pendingMessages的数量是否超过限
制(由"PendingMessageLimitStrategy"来检测),如果超过限制,将会在pendingMessages中使用
MessageEvicationStrategy移除多余的消息,此后将新消息保存在PendingMessages中。
-->
<messageEvictionStrategy>
<!--
OldestMessageEvictionStrategy: 移除旧消息,默认策略。
OldestMessageWithLowestPriorityEvictionStrategy: 旧数据中权重较低的消息,将会被移除。
UniquePropertyMessageEvictionStrategy: 移除具有指定property的旧消息。开发者可以指定property的名称
,从此属性值相同的消息列表中移除较旧的(根据消息的创建时间)。
-->
<OldestMessageWithLowestPriorityEvictionStrategy />
</messageEvictionStrategy>
<!--
慢速消费者策略
Broker将如何处理慢消费者。Broker将会启动一个后台线程用来检测所有的慢速消费者,并定期关闭关闭它们。
-->
<slowConsumerStrategy>
<!--
AbortSlowConsumerStrategy: 中断慢速消费者,慢速消费将会被关闭。abortConnection是否关闭连接
AbortSlowConsumerStrategy: 如果慢速消费者最后一个ACK距离现在的时间间隔超过阀maxTimeSinceLastAck,
则中断慢速消费者。
-->
<abortSlowConsumerStrategy abortConnection="false"/><!-- 不关闭底层链接 -->
</slowConsumerStrategy>
<!--转发策略 将消息转发给消费者的方式-->
<dispatchPolicy>
<!--
RoundRobinDispatchPolicy: “轮询”,消息将依次发送给每个“订阅者”。“订阅者”列表默认按照订阅的先后
顺序排列,在转发消息时,对于匹配消息的第一个订阅者,将会被移动到“订阅者
”列表的尾部,这也意味着“下一条”消息,将会较晚的转发给它。
StrictOrderDispatchPolicy: 严格有序,消息依次发送给每个订阅者,按照“订阅者”订阅的时间先后。它和
RoundRobin最大的区别是,没有移动“订阅者”顺序的操作。
PriorityDispatchPolicy: 基于“property”权重对“订阅者”排序。它要求开发者首先需要对每个订阅者指定
priority,默认每个consumer的权重都一样。
SimpleDispatchPolicy: 默认值,按照当前“订阅者”列表的顺序。其中PriorityDispatchPolicy是其子类。
-->
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<!--恢复策略 ActiveMQ重启如何恢复数据-->
<subscriptionRecoveryPolicy>
<!--
FixedSizedSubscriptionRecoveryPolicy: 保存一定size的消息,broker将为此Topic开辟定额的RAM用来保存
最新的消息。使用maximumSize属性指定保存的size数量
FixedCountSubscriptionRecoveryPolicy: 保存一定条数的消息。 使用maximumSize属性指定保存的size数量
LastImageSubscriptionRecoveryPolicy: 只保留最新的一条数据
QueryBasedSubscriptionRecoveryPolicy: 符合置顶selector的消息都将被保存,具体能够“恢复”多少消息
,由底层存储机制决定;比如对于非持久化消息,只要内存中还
存在,则都可以恢复。
TimedSubscriptionRecoveryPolicy: 保留最近一段时间的消息。使用recoverDuration属性指定保存时间 单位
毫秒
NoSubscriptionRecoveryPolicy: 关闭“恢复机制”。默认值。
-->
<!--恢复最近30分钟内的信息-->
<timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
</subscriptionRecoveryPolicy>
<!--"死信"策略 如何处理过去消息
缺省死信队列(Dead Letter Queue)叫做ActiveMQ.DLQ;所有的未送达消息都会被发送到这个队列,以致会非常
难于管理。 默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为
Queue;不过开发者也可以指定为Topic。
-->
<deadLetterStrategy>
<!--
IndividualDeadLetterStrategy: 把DeadLetter放入各自的死信通道中,queuePrefix自定义死信前缀
,useQueueForQueueMessages使用队列保存死信,还有一个属性为“useQueueForTopicMessages”,此值表示是否
将Topic的DeadLetter保存在Queue中,默认为true。
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
SharedDeadLetterStrategy: 将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略
。共享队列默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定。还有2个很重要的可选参数
,“processExpired”表示是否将过期消息放入死信队列,默认为true;“processNonPersistent”表示是否将“
非持久化”消息放入死信队列,默认为false。
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
DiscardingDeadLetterStrategy: broker将直接抛弃DeadLeatter。如果开发者不需要关心DeadLetter,可以使用
此策略。AcitveMQ提供了一个便捷的插件:DiscardingDLQBrokerPlugin,来抛弃DeadLetter。
下面这个必须配置plugins节点中才对,
丢弃所有死信
<discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" />
丢弃指定死信
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000"
/>
使用丢弃正则匹配到死信
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}"
reportInterval="3000" />
-->
<individualDeadLetterStrategy queuePrefix="DLQ.TOPIC." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
<!--非耐久待处理消息处理策略 类似于:pendingQueuePolicy(在下面自己找找)-->
<pendingSubscriberPolicy>
<!--支持三种策略:storeCursor, vmCursor和fileCursor。-->
<fileCursor/>
</pendingSubscriberPolicy>
<!--耐久待处理消息处理策略 类似于:pendingQueuePolicy(在下面自己找找)-->
<pendingDurableSubscriberPolicy>
<!--支持三种策略:storeDurableSubscriberCursor, vmDurableCursor和 fileDurableSubscriberCursor。-->
<storeDurableSubscriberCursor/>
</pendingDurableSubscriberPolicy>
</policyEntry>
<!--消息队列-->
<policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb">
<pendingMessageLimitStrategy>
<prefetchRatePendingMessageLimitStrategy multiplier="10"/>
</pendingMessageLimitStrategy>
<messageEvictionStrategy>
<OldestMessageWithLowestPriorityEvictionStrategy />
</messageEvictionStrategy>
<slowConsumerStrategy>
<abortSlowConsumerStrategy abortConnection="false"/>
</slowConsumerStrategy>
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
</subscriptionRecoveryPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ.QUEUE." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
<!--
pendingQueuePolicy 待消费消息策略
通道中有大量Slow Consumer时,Broker该如何优化消息的转发,以及在此情况下,“非持久化”消息达到内存
限制时该如何处理。
当Broker接受到消息后,通常将最新的消息写入内存以提高消息转发的效率,提高消息ACK的效率,减少对对底
层Store的操作;如果Consumer非常快速,那么消息将会立即转发给Consumer,不需要额外的操作;但当遇到
Slow Consumer时,情况似乎并没有那么美好。
持久化消息,通常为:写入Store->线程轮询,从Store中pageIn数据到PendingStorage->转发给Consumer->从
PendingStorage中移除->消息ACK后从Store中移除。
对于非持久化数据,通常为:写入内存->如果内存足够,则PendingStorage直接以内存中的消息转发->如果内
存不足,则将内存中的消息swap到临时文件中->从临时文件中pageIn到内存,转发给Consumer。
AcitveMQ提供了几个的Cursor机制,它就是用来保存Pending Messages。
1) vmQueueCursor: 将待转发消息保存在额外的内存(JVM linkeList)的存储结构中。是“非持久化消息”的默
认设置,如果Broker不支持Persistent,它是任何类型消息的默认设置。有OOM风险。
2) fileQueueCursor: 将消息保存到临时文件中。文件存储方式有broker的tempDataStore属性决定。是“持久
化消息”的默认设置。
3) storeCursor: “综合”设置,对于非持久化消息,将采用vmQueueCursor存储,对于持久化消息采用
fileQueueCursor。这是强烈推荐的策略,也是效率最好的策略。
-->
<pendingQueuePolicy>
<storeCursor>
<nonPersistent>
<fileQueueCursor/>
</nonPersistent>
</storeCursor>
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
ActiveMQ的特性之一是很好的支持JMX。通过JMX MBeans可以很方便的监听和控制ActiveMQ的broker。
官方网站提供的JMX特性说明对于远程访问的配置流程坑爹,如果想使用jconsole对ActiveMQ进行监控, 无密码访问> 需要在borker节点设置useJmx属性为true,且managementContext节点的createConnector属性为true。 通过jconsole访问地址service:jmx:rmi:///jndi/rmi://ip:1099/jmxrmi进行连接, 默认端口为1099,可以通过connectorPort属性修改连接端口,远程访问需要设置connectorHost属性 为本机ip以供远程访问 有密码访问> 需要在borker节点设置useJmx属性为true,且managementContext节点的createConnector属性为false。 然后在${actviemq.base}/conf目录下的jmx.access和jmx.password中添加用户权限和密码, 最后修改${activemq.base}/bin/activemq文件,找到下面的内容然后去掉注释,保存退出,重启activemq即可 # ACTIVEMQ SUNJMXSTART="-Dcom.sun.management.jmxremote.port=11099" # ACTIVEMQ SUNJMX START="$ACTIVEMQ SUNJMX START-Dcom.sun.management.jmxremote.password.file=${ACTIVEMQ CONF}/ jmx.password" # ACTIVEMQ SUNJMX START="$ACTIVEMQ SUNJMX START-Dcom.sun.management.jmxremote.access.file=${ACTIVEMQ CONF}/ jmx.access" # ACTIVEMQ SUNJMX START="$ACTIVEMQ SUNJMX START -Dcom.sun.management.jmxremote.ssl=false"
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--持久化存储-->
<persistenceAdapter>
<!--
官方默认的持久化方案
AMQ Message Store 是 ActiveMQ5.0 缺省的持久化存储。Message commands 被保存到 transactional journal(由
rolling data logs 组成)。Messages 被保存到 data logs 中,同时被 reference store 进行索引以提高存取速度。
Date logs由一些单独的 data log 文件组成,缺省的文件大小是 32M,如果某个消息的大小超过了 data log 文件的大
小,那么可以修改配置以增加 data log 文件的大小。
如果某个 data log 文件中所有的消息都被成功消费了,那么这个 data log 文件将会被标记,以便在下一轮的清理中
被删除或者归档。
-->
<amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>
<!--Kaha Persistence 是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。在 Kaha 中,数据被
追加到 data logs 中。当不再需要 log文件中的数据的时候,log 文件会被丢弃。-->
<!-- <kahaDB directory="${activemq.data}/kahadb"/>-->
<!--
支持的数据库有Apache Derby,Axion,DB2,HSQL,Informix,MaxDB,MySQL,Oracle,Postgresql,SQLServer,Sybase。
如果你使用的数据库不被支持,那么可以调整 StatementProvider 来保证使
用正确的 SQL 方言(flavour of SQL)。通常绝大多数数据库支持以下 adaptor: