RabbitMQ
程序员文章站
2022-07-12 12:49:15
...
RabbitMQ介绍
MQ介绍:
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message
Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开
发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
消息队列的作用:
异步调用,应用解耦,削弱峰值,消息通讯
RabbitMQ的优势:
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
登录RabbitMQ:
http://localhost:15672
默认账号密码:
guest/guest
导入依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version><!--此版本与spring boot 1.5.9版本匹配-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
Producer
private static final String QUEUE = "helloworld";
//1.创建连接工厂对象,并设置参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
factory.setVirtualHost("/");
//2.建立一次Socket连接
Connection connection = factory.newConnection();
//3.创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
Channel channel = connection.createChannel();
//4.声明队列,参数1:队列名称,参数2:是否持久化,参数3:队列是否独占此连接,参数4:队列不再使用时是否自动删除此队列,参数5:队列参数
channel.queueDeclare(QUEUE,true,false,false,null);
//5.发布消息,参数1:Exchange的名称,如果没有指定,则使用Default Exchange,参数2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,参数3:消息包含的属性,参数4:消息体
String message = "Hello World..."
channel.basicPublish("",QUEUE,null,message.getBytes());
//6.释放资源
channel.close();
connection.close();
Consumer
private static final String QUEUE = "helloworld";
//1.创建连接工厂对象,并设置参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
factory.setVirtualHost("/");
//2.建立一次Socket连接
Connection connection = factory.newConnection();
//3.创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
Channel channel = connection.createChannel();
//4.开始监听消息 参数1:队列名称,参数2:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复,参数3:消费消息的方法,消费者接收到消息后调用此方法
channel.basicConsumer(QUEUE,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange(); //交换机名称
long deliveryTag = envelope.getDeliveryTag(); //消息的ID
String routingKey = envelope.getRoutingKey(); //路由的名称
String message = new String(body,"UTF-8"); //监听到的消息
}
});
RabbitMQ工作模式
1、Work queues
流程:生产者-->队列-->默认交换机-->队列-->消费者
Work queues模式下,多个消费者监听生产者,rabbitMQ采用轮询的方式将消息是平均发送给消费者(activeMQ是抢占式,谁抢到归谁)
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
生产者:
//1.声明队列
channel.queueDeclare(QUEUE,true,false,false,null);
//2.发送消息
channel.basicPublish("",QUEUE,null,message.getBytes());
消费者:
//1.声明队列
channel.queueDeclare(QUEUE,true,false,false,null);
//2.监听消息队列
channel.basicConsume(QUEUE,new DefaultConsumer(){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"UTF-8"));
}
});
2、Publish/Subscribe
流程:生产者-->队列-->自定义交换机-->队列-->消费者
发布订阅模式下,每个消费者监听自己的队列。生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到各自的消息
应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法。
生产者:
//1.声明队列
channel.queueDeclare(SMS,true,false,false,null);
channel.queueDeclare(EMAIL,true,false,false,null);
//2.声明交换机
channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT);
//3.绑定交换机和队列,参数1:队列名称,参数2:交换机名称,参数3:路由key
channel.queueBind(SMS,EXCHANGE,"");
channel.queueBind(EMAIL,EXCHANGE,"");
//4.发送消息
channel.basicPublish(EXCHANGE,"",null,message.getBytes());
消费者:
//1.声明队列
channel.queueDeclare(SMS,true,false,false,null);
channel.queueDeclare(EMAIL,true,false,false,null);
//2.声明交换机
channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT);
//3.绑定交换机和队列,参数1:队列名称,参数2:交换机名称,参数3:路由key
channel.queueBind(SMS,EXCHANGE,"");
channel.queueBind(EMAIL,EXCHANGE,"");
//4.监听消息队列
//监听SMS消息队列
channel.basicConsume(SMS,new DefaultConsumer(){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"UTF-8"));
}
});
//监听EMAIL消息队列
channel.basicConsume(EMAIL,...);
3、Routing
流程:生产者-->队列-->路由-->自定义交换机-->路由-->队列-->消费者
Routing路由模式下,每个消费者监听自己的队列,并且设置routingkey。生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
...
channel.queueBind(EMAIL,EXCHANGE,ROUTING_KEY);
channel.basicPublish(EXCHANGE,ROUTING_KEY,null,message.getBytes());
...
4、Topics
Topics路由模式下,每个消费者监听自己的队列,并且设置带统配符的routingkey。生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
应用场景:根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
队列绑定交换机通配符规则:中间以“.”分隔。符号#可以匹配任意个词,符号*可以匹配一个词语
生产者:
channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.TOPIC);
channel.queueBind(SMS,EXCHANGE,"routing.sms");
channel.queueBind(EMAIL,EXCHANGE,"routing.email");
channel.basicPublish(EXCHANGE,"routing.sms",null,message.getBytes());
channel.basicPublish(EXCHANGE,"routing.email",null,message.getBytes());
channel.basicPublish(EXCHANGE,"routing.sms.email",null,message.getBytes());
消费者:
channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.TOPIC);
channel.queueBind(SMS,EXCHANGE,"routing.#.sms");
channel.queueBind(EMAIL,EXCHANGE,"routing.#.email");
channel.basicConsume(SMS,new DefaultConsume(){ ... });
channel.basicConsume(EMAIL,new DefaultConsume(){ ... });
5、Header
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列
生产者:
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String,Object> map1 = new HashMap();
map1.put("sms","header.sms");
channel.queueBind(SMS,EXCHANGE,"",map1); //queueBind方法的重载
Map<String,Object> map2 = new HashMap();
map2.put("email","header.email");
channel.queueBind(EMAIL,EXCHANGE,"",map2);
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.headers(map1);
channel.basicPublich(EXCHANGE,"",builder.build(),messgae.getBytes());
6、RPC
tip:所有大写字符串,都被声明为private static final 成员变量
注意:
1.Work Queue模式,底层实际上也是自动将队列绑定默认的交换机;因此Work Queue模式跟Publish/Subscribe模式其实效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2.Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。
SpringBoot整合RabbitMQ=====
RabbitConfig配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
//创建持久化的交换机
@Bean("EXCHANGE_TOPICS_INFORM")
public Exchange EXCHANGE_TOPICS_INFORM() {
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//声明队列
@Bean("QUEUE_INFORM_EMAIL")
public Queue QUEUE_INFORM_EMAIL() {
return new Queue(QUEUE_INFORM_EMAIL);
}
//声明队列
@Bean("QUEUE_INFORM_SMS")
public Queue QUEUE_INFORM_SMS() {
return new Queue(QUEUE_INFORM_SMS);
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier("QUEUE_INFORM_EMAIL") Queue queue,
@Qualifier("EXCHANGE_TOPICS_INFORM") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
}
@Bean
public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier("QUEUE_INFORM_SMS") Queue queue,
@Qualifier("EXCHANGE_TOPICS_INFORM") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
}
}
生产者
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ManageApplication.class)
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
@Test
public void testProducer() {
rabbitTemplate.convertAndSend(EXCHANGE_TOPICS_INFORM,"inform.sms.email",
"hello world..");
}
}
消费者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiveHandler {
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
@RabbitListener(queues = {QUEUE_INFORM_EMAIL})
public void receive_email(String message, Channel channel) {
System.out.println(message);
}
@RabbitListener(queues = {QUEUE_INFORM_SMS})
public void receive_sms(String message, Channel channel) {
System.out.println(message);
}
}
下一篇: RabbitMQ
推荐阅读