RabbitMQ工作队列
程序员文章站
2022-05-17 08:48:25
...
如果一个生产者对应多个消费者就代表一个工作队列,工作队列最大的特点在于,一个生产者对应多个消费者
队列会自动进行负载均衡处理,每个消费者消费的信息均衡
模型实例
此时候需要建立3个消费者和1个生产者进行测试
(1)生产者
public class MessageProducer {
//RabbitMQ服务所在地址
public final static String HOST="192.168.74.142";
//RabbitMQ端口
public final static int PORT=5672;
//RabbitMQ登陆用户名
public final static String USERNAME="sjw";
//RabbitMQ登陆密码
public final static String PASSWORD="123";
//队列名称
public final static String QUEUE_NAME="sjw.queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
//获取连接
Connection connection=factory.newConnection();
//获取信道,可以有多个信道
Channel channel=connection.createChannel();
//信道指定队列已经队列设置
//queueDeclare(名字,是否持久化,独占的queue, 不使用时是否自动删除,其他参数)
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
//在开始前获取一下当前时间,方便统计消息全部进入队列所需的时间
long start=System.currentTimeMillis();
for (int i=0;i<1000;i++){
String message="sjw"+i;
//basicPublish(exchange,队列名称,属性,参数.getbyte())
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
//结束时间
long end=System.currentTimeMillis();
//输出所需时间
System.out.println("进入队列总共耗时:"+(end-start));
//关闭信道
channel.close();
//关闭连接
connection.close();
}
}
(2)消费者
public class MessageConsumerA {
//RabbitMQ服务所在地址
public final static String HOST="192.168.74.142";
//RabbitMQ端口
public final static int PORT=5672;
//RabbitMQ登陆用户名
public final static String USERNAME="sjw";
//RabbitMQ登陆密码
public final static String PASSWORD="123";
//队列名称
public final static String QUEUE_NAME="sjw.queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
//获取连接
Connection connection = factory.newConnection();
//获取信道,可以有多个信道
Channel channel = connection.createChannel();
//信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
//信道交给consumer进行内容接收处理
channel.basicConsume(QUEUE_NAME,consumer);
while (true) { //消费者程序运行开着 如果生产者新增了数据会自动获取
// nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[消费者A:]"+"[消息]" + message);
}
}
}
启动ABC三个消费者程序,最后启动生产者,然后查看消费者程序控制台
(1)消费者A
(2)消费者B
(3)消费者C
发现工作队列可以进行负载均衡消费
下一篇: rabbitMQ安装