.Net Core 3.1使用ActiveMQ消息队列简易范例
程序员文章站
2024-03-13 11:57:57
...
新建两个项目
test为生产者是一个MVC Api项目,ConsumerConsole为消费者是一个控制台项目
新建一个ProviderController.cs作为生产者控制器
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Util;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
namespace test.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class ProviderController : ControllerBase
{
private readonly ILogger<ProviderController> _logger;
public ProviderController(ILogger<ProviderController> logger)
{
_logger = logger;
}
[HttpGet]
public IActionResult Index(string msg)
{
Task.Run(() =>
{
string _queuesName = "testQueue";
var _uri = new Uri(string.Concat("activemq:failover:(tcp://localhost:61616)"));
IConnectionFactory _factory = new ConnectionFactory(_uri);
using (IConnection _conn = _factory.CreateConnection())
{
using (Apache.NMS.ISession _session = _conn.CreateSession())
{
IDestination _destination = SessionUtil.GetDestination(_session, _queuesName);
using (IMessageProducer producer = _session.CreateProducer(_destination))
{
ITextMessage request = _session.CreateTextMessage(msg);
producer.Send(request);
Console.WriteLine("发送消息:" + msg);
Thread.Sleep(100);
}
}
}
});
return Content("发送消息成功");
}
}
}
消费者目录结构
Program.cs
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Util;
using System;
using System.Threading.Tasks;
namespace ConsumerConsole
{
class Program
{
static void Main(string[] args)
{
Task.Run(() =>
{
string queuesName = "testQueue";
Uri _uri = new Uri(String.Concat("activemq:failover:(tcp://localhost:61616)"));
IConnectionFactory factory = new ConnectionFactory(_uri);
using (IConnection conn = factory.CreateConnection())
{
using (ISession session = conn.CreateSession())
{
conn.Start();
IDestination destination = SessionUtil.GetDestination(session, queuesName);
using (IMessageConsumer consumer = session.CreateConsumer(destination))
{
consumer.Listener += (IMessage message) =>
{
ITextMessage msg = (ITextMessage)message;
Console.WriteLine("从MQ接收到消息:" + msg.Text);
};
Console.ReadLine();
}
}
}
});
Console.ReadLine();
}
}
}
用PostMan测试一下
AcitveMQ控制台此时已经接收到生产者发送过来的消息
消费者控制台接收到消息