ActiveMQ安装与使用总结
1. 安装准备
下载ActiveMQ:http://activemq.apache.org/
安装(后台服务启动):
bin\win64\InstallService.bat 双击安装即可,如果有杀毒软件的话先关闭杀毒软件
默认端口61616,安装过程中如果有端口占用的情况,先找到占用端口的进程然后关闭掉。
查找进程的操作:cmd->输入指令 netstat -ano | findstr"61616"
关闭进程的指令:taskkill /PID 6400,PID后面跟着的就是你要关闭的进程PID。
安装成功后到任务管理器看进程是否存在并开启进程
开启服务成功后访问:http://localhost:8161/admin ,不手动进行配置的话默认账号跟密码都是admin
2. netTOActiveMQ的简单使用
①消息队列的简单示例
使用NuGet引入ActiveMQ的类库Apache.NMS.ActiveMQ,然后就可以直接写测试代码了。如下所示:
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Util;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ActiveMQ
{
class CombineTest
{
static void Main(string[] args)
{
//
Task.Run(() =>
{
string queuesName = "myQueue2";
Uri _uri = new Uri(String.Concat("activemq:failover:(tcp://localhost:61616)"));
IConnectionFactory factory = new ConnectionFactory(_uri);
using (IConnection conn = factory.CreateConnection())
{
using (ISession session = conn.CreateSession())
{
conn.Start();
IDestination destination = SessionUtil.GetDestination(session, queuesName);
using (IMessageConsumer consumer = session.CreateConsumer(destination))
{
consumer.Listener += (IMessage message) =>
{
ITextMessage msg = (ITextMessage)message;
Console.WriteLine(" + msg.Text);
};
Console.ReadLine();
}
}
}
});
//
Task.Run(() =>
{
string _queuesName = "myQueue2";
var __uri = new Uri(string.Concat("activemq:failover:(tcp://localhost:61616)"));
IConnectionFactory _factory = new ConnectionFactory(__uri);
using (IConnection _conn = _factory.CreateConnection())
{
using (ISession _session = _conn.CreateSession())
{
IDestination _destination = SessionUtil.GetDestination(_session, _queuesName);
using (IMessageProducer producer = _session.CreateProducer(_destination))
{
//
for (int i = 0; i < 10; i++)
{
ITextMessage request = _session.CreateTextMessage(" + i);
producer.Send(request);
Console.WriteLine(" + i);
Thread.Sleep(200);
}
}
}
}
});
Console.ReadLine();
}
}
}
如我们代码所示,采用两个进程,一个作为消息生产者发送消息一个作为消息消费者接收消息。我们先不急着解析代码,执行一下代码看看效果。
但是在我们项目中编译后的文件是一个DLL类库文件不是可执行的文件,处理很简单在这个测试项目右键-》属性-》改为应用程序-》启动对象改为这个测试类。在项目代码路径下找到这个项目的BIN\DEDUG文件夹就能找到对应的可执行文件xxx.exe。双击这个文件就能看到结果了(如果没有反应,则查看一下你的启动对象是否有设置对),执行结果:
当然我们也可以把这些生成的消息都写到日志文件里。
接下来我们就一起来分析一下代码:
首先Task.Run(() =>{yourcode})这是一个用lambda表达式实现的一个委托,没什么好讲的啦,都是基础,不清楚的可以再复习一下委托和lambda。
主要是对ActiveMQ的一些操作代码进行分析和扩展。
如图先定义服务器的地址,端口默认61616,failover是失效转移协议,这里涉及到集群的概念。
集群包括consumer集群(消息消费者集群)和broker集群(消费服务器集群)
关于消费者集群
对于消费者集群,对于队列消费者,主要是:1.保证如果某一个消费者死亡了,任何它没有确认完的消息会被重传别的正常的消费者来消费;2.如果一个消费者消费消息过快,就可以比别的消费者得到更多的消息;3.如果一个消费者消费消息过慢,它就会被少得到消息。第1点几乎是所有JMS提供者都有的功能——消息重传机制(可以参考我的其他ActiveMQ博文)。第2点和第3点也是很正常的,因为大多消费者和线程是一一对应的关系(独立线程),你消费速率快,当然可以自己去服务器拉取更多的消息。当然ActiveMQ在队列上给消费者提供了高性能的负载均衡策略。对于主题订阅者,由于每个订阅者接受到被推送的消息都和其他订阅者无关
关于服务器集群
对于消息服务器集群,主要是指:1.如果集群中的某一台消息服务器宕机,与该台消息服务器相连接的生产者和消费者能否自动连接到其他正常工作的消息服务器(主从复制)。2.如果集群中的某一台消息服务器宕机,该台服务器上未消费的消息能否在该台服务器恢复正常之前由其他服务器转发。3.集群环境中会不会导致某台消息服务器上只有消费者或者某台消息服务器上只有生产者。对于1,ActiveMQ提供了一种叫做失效转移(也叫故障转移,FailOver)的策略。失效转移提供了在传输层上重新连接到其他任何传输器的功能。使用它很简单,只需要在uri中配置就行了,语法如下:
failover:(uri1,...,uriN)?transportOptions 或者 failover:uri1,...,uriN
例子:
failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100 (
如果这样使用报错你可以试试这个:failover://(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100 (this way works in ActiveMQ 4.1.1 the one above does not))
如果某个ActiveMQ客户端发现uri1地址失效了,它会立即转向uri地址列表中其他可以连接的消息服务器进行重连,以保证继续正常工作,请注意,并不是uri1失效了就会选则uri2重连,这种选择其他地址的方式默认是随机的,以保证负载均衡,如果你想关闭随机,可以transportOptions中加入randomize=false。
3. transportOptions参数分析
transportOptions有多种参数可以选择,如下(参考大神的分析:https://blog.csdn.net/u014431479/article/details/84626916):
initialReconnectDelay:默认为10,单位毫秒,表示第一次尝试重连之前等待的时间。
maxReconnectDelay:默认30000,单位毫秒,表示两次重连之间的最大时间间隔。
useExponentialBackOff:默认为true,表示重连时是否加入避让指数来避免高并发。
reconnectDelayExponent:默认为2.0,重连时使用的避让指数。
maxReconnectAttempts:5.6版本之前默认为-1,5.6版本及其以后,默认为0,0表示重连的次数无限,配置大于0可以指定最大重连次数。
startupMaxReconnectAttempts:默认为0,如果该值不为0,表示客户端接收到消息服务器发送来的错误消息之前尝试连接服务器的最大次数,一旦成功连接后,maxReconnectAttempts值开始生效,如果该值为0,则默认采用maxReconnectAttempts。
randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略,记住,这种随机策略在第一次选择URI列表中的地址时就开始生效,所以,如果为true的话,一个生产者和一个消费者的Failover连接地址都是两个URI的话,有可能生产者连接的是第一个,而消费者连接的是第二个,造成一个服务器上只有生产者,一个服务器上只有消费者的尴尬境地。
backup:默认为false,表示是否在连接初始化时将URI列表中的所有地址都初始化连接,以便快速的失效转移,默认是不开启。
timeout:默认为-1,单位毫秒,是否允许在重连过程中设置超时时间来中断的正在阻塞的发送操作。-1表示不允许,其他表示超时时间。
nested.*:默认为null,5.9及其以后版本可用,表示给嵌套的URL添加额外的选项。 以前,如果你想检测让死连接速度更快,你必须在wireFormat.maxInactivityDuration= 1000选项添加到失效转移列表中的所有嵌套的URL。例如:
failover:(tcp://host01:61616?wireFormat.maxInactivityDuration=1000,tcp://host02:61616?wireFormat.maxInactivityDuration=1000,tcp://host03:61616?wireFormat.maxInactivityDuration=1000)
而现在,你只需要这样:
failover:(tcp://host01:61616,tcp://host02:61616,tcp://host03:61616)?nested.wireFormat.maxInactivityDuration=1000
warnAfterReconnectAttempts.*:默认为10,5.10及其以后的版本可用,表示每次重连该次数后会打印日志告警,设置<=0的值表示禁用
econnectSupported:默认为true,表示客户端是否应响应经纪人 ConnectionControl事件与重新连接(参见:rebalanceClusterClients)updateClusterClients:默认为false,如果为true,则会将broker集群的拓扑结构的改变信息传递给连接的客户端。
rebalanceClusterClients:默认为false,如果为true,则如果有新的消息服务器加入到消息服务器集群中,则连接的客户端将被要求重新平衡(asked to rebalance)。注意, priorityBackup=true能覆盖。
updateClusterClientsOnRemove:默认为false,如果为true,则当一个集群从网络中移除的时候将更新客户端。有了这个选项,可以在消息服务器移除时更新客户端,而不是仅仅只是新增消息服务器时更新。(难道官方文档有问题:if true, will update clients when a cluster is removed from the network. Having this as separate option enables clients to be updated when new brokers join, but not when brokers leave.)
updateClusterFilter:默认为null,如果有值,将会是逗号分隔的正则表达式列表,用来过滤掉失效转移时的消息服务器集群中的服务器名称。
ActiveMQ提供了优先级备份(priority backup )的特性,所以你可以让客户端自动重连到所谓的“优先级”URI,你可以在客户端如下配置URI地址:
failover:(tcp://local:61616,tcp://remote:61616)?randomize=false&priorityBackup=true
在某些情况下,你希望不止一个URI地址优先,则你可以使用priorityURIs参数:
failover:(tcp://local1:61616,tcp://local2:61616,tcp://remote:61616)?randomize=false&priorityBackup=true&priorityURIs=tcp://local1:61616,tcp://local2:61616