ActiveMQ发布订阅模式
程序员文章站
2022-05-21 13:54:12
...
ActiveMQ的另一种模式就SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思。意思就是一个来源分到N个出口。还是上节的例子,当一个订单产生后,后台N个系统需要联动,但有一个前提是都需要收到订单信息,那么我们就需要将一个生产者的消息发布到N个消费者。
生产者:
try
{
//创建连接工厂
IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
using (IConnection connection = factory.CreateConnection())
{
//创建会话
using (ISession session = connection.CreateSession())
{
//为主题/队列创建生产者
IMessageProducer prod = session.CreateProducer(
new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
//发送消息
int i = 0;
while (!Console.KeyAvailable)
{
ITextMessage msg = prod.CreateTextMessage();
msg.Text = i.ToString();
Console.WriteLine("Sending: " + i.ToString());
prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
System.Threading.Thread.Sleep(5000);
i++;
}
}
}
Console.ReadLine();
}
catch (System.Exception e)
{
Console.WriteLine("{0}", e.Message);
Console.ReadLine();
}
假设生产者每5秒发送一次消息:
消费者:
static void Main(string[] args)
{
try
{
//创建连接工厂
IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
//创建连接
using (IConnection connection = factory.CreateConnection())
{
connection.ClientId = "testing listener1";
connection.Start();
//创建会话
using (ISession session = connection.CreateSession())
{
//创建消费者
IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener1", null, false);
consumer.Listener += new MessageListener(consumer_Listener);
Console.ReadLine();
}
connection.Stop();
connection.Close();
}
}
catch (System.Exception e)
{
Console.WriteLine(e.Message);
}
}
static void consumer_Listener(IMessage message)
{
try
{
ITextMessage msg = (ITextMessage)message;
Console.WriteLine("Receive: " + msg.Text);
}
catch (System.Exception e)
{
Console.WriteLine(e.Message);
}
}
启动一个消费者:
我们发现他是从15开始的,而不是像上节一样从头开始,再启动另一个消费者:
我们发现就是从启动时开始接受消息的,之前的消息就丢失了。
整体状态如下:
我们观察管理界面:
产生了一个testing的Topics,而订阅方有2个都订阅的是testing:
这样只需要在需要获取消息的地方订阅即可及时获得。