欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ快速入门

程序员文章站 2022-03-08 17:54:46
...

简介

RabbitMQ是什么?

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

RabbitMQ是AMQP的一种实现,消息队列的一种,类似的实现还有:ActiveMQ,kafka等。

RabbitMQ可以做什么?

使用RabbitMQ可以将一些无需即时返回且耗时的操作抽离出去,进行异步操作,从而减少服务器的响应时间,提高系统的吞吐量。

例如:业务系统进行增删改查时,需要进行数据统计,就可以将统计数据的操作抽离出来,使用消息队列来异步统计。

AMQP是什么?

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

AMQP和Http类似,是一种协议、规范,而RabbitMQ是根据AMQP协议实现的一款产品。

AMQP的一些概念:

  • Broker:接收和分发消息的应用,消息系统。
  • Virtual Host:虚拟主机,一个消息服务可以划分多个Virtual Host,不同Virtual Host之间数据是隔离的互不影响,类似于命名空间。
  • Connection:网络连接,如TCP/IP。
  • Channel:通道,多路复用连接中的一条独立的双向数据流通道。如果每一次与消息服务通信都建立一次TCP连接,这个开销将是巨大的,Channel是建立在Connection内部的逻辑连接,减少了建立TCP连接的开销。
  • Exchange:交换机,用来接收生产者发送的消息,并根据不同的规则将消息路由到不同的队列中。
  • Queue:队列,存放由Exchange路由过来的消息,并发送给消费者。
  • Binding:队列和交换机之间的关联。

服务安装部署

RabbitMQ分为服务端和客户端。
服务端由erlang语言编写,运行需要erlang环境,就如运行Java程序需要安装JDK一样。
所以安装RabbitMQ之前,需要先安装erlang环境。

erlang安装

下载erlang安装包,版本不能太久,具体参考官网版本号。

wget https://packages.erlang-solutions.com/erlang/rpm/centos/7/x86_64/esl-erlang_22.2.1-1~centos~7_amd64.rpm

安装erlang

rpm -ivh esl-erlang_22.2.1-1~centos~7_amd64.rpm

如果安装出错,可能是缺少依赖。

yum install epel-release
yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl

执行erl命令可以查看erlang是否安装成功。

RabbitMQ安装与启动

可以直接使用官网提供的rpm安装包,也可以使用二进制文件来构建安装。

安装之前先安装依赖。

yum install -y socat

安装完成后,就可以启动服务了,几个命令:

service rabbitmq-server start #启动
service rabbitmq-server stop #停止
service rabbitmq-server restart #重启
service rabbitmq-server status #查看状态

RabbitMQ开启Web可视化插件

rabbitmq-plugins enable rabbitmq_management

RabbitMQ默认只有一个账号:guest,且只允许本地登录,如果服务不是部署在本地是无法登录的,可以使用命令来创建账号:

# 添加账号 root root
rabbitmqctl add_user root root
# 添加标签 administrator(不代表真正的权限)
rabbitmqctl set_user_tags root administrator

RabbitMQ的Web服务默认端口为15672,访问如下图所示:
RabbitMQ快速入门

使用刚刚创建的用户登录,即可看到管理页面。

Java操作RabbitMQ

RabbitMQ支持多种客户端,这里使用Java。

导入Maven依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
</dependency>

编写一个工具类

public class MQUtil {
	//队列名
	public static final String QUEUE_NAME = "queueTest";
	//交换机名
	public static final String EXCHANGE_NAME = "exchangeTest";
	//连接工厂
	private static final ConnectionFactory factory = new ConnectionFactory();

	static {
		factory.setHost("192.168.1.120");
		factory.setPort(5672);
		factory.setVirtualHost("test");
		factory.setUsername("root");
		factory.setPassword("root");
	}

	public static Connection getConnection(){
		try {
			return factory.newConnection();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
}

创建生产者,发送消息

public class Producer {
	private Connection connection = MQUtil.getConnection();
	
	//发送消息
	public void sendMessage(String message) throws Exception {
		Channel channel = connection.createChannel();
		/**
		 * 声明队列,各参数含义:
		 * 队列名称
		 * 是否持久化(队列持久化,不是消息持久化)
		 * 是否排他(只有当前连接可以操作队列)
		 * 是否自动删除
		 * 队列其他参数Map
		 */
		channel.queueDeclare(MQUtil.QUEUE_NAME, true, false, false, null);
		//声明交换机 交换机名称、类型
		channel.exchangeDeclare(MQUtil.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

		/**
		 * 交换机和队列绑定
		 * 第三个参数:routingKey路由键,FANOUT类型的交换机无需路由键
		 */
		channel.queueBind(MQUtil.QUEUE_NAME, MQUtil.EXCHANGE_NAME, "");
		channel.basicPublish(MQUtil.EXCHANGE_NAME, "", null, message.getBytes());
		
		channel.close();
	}
}

创建消费者,消费消息

public class Consumer {
	Connection connection = MQUtil.getConnection();

	public void consume() throws Exception {
		Channel channel = connection.createChannel();

		//消费消息
		channel.basicConsume(MQUtil.QUEUE_NAME, true, new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String message = new String(body);
				System.out.println(message);
			}
		});
	}
}

启动生产者,发送一个消息,Web界面如下图:
RabbitMQ快速入门

  • Ready:等待被消费的消息。
  • Unacked:消息被取走,但还未确认删除。
  • Total:消息总量。

Exchange的类型

fanout

只要队列与交换机绑定了,交换机就会把消息路由到队列中。
与路由键routingKey没有关系。

RabbitMQ快速入门

direct

交换机与队列通过一个路由键routingKey绑定,发送消息时指定routingKey,只有routingKey匹配时,交换机才会将消息路由到队列。

RabbitMQ快速入门

topic

主题模式,将消息进行分类,队列只关心某一类的消息。

在direct的基础上进行了模糊匹配,单词通过.隔开,*表示匹配1个单词,#表示匹配任意个单词。

例如:日志采集系统,将infoerror的日志单独采集,就可以使用topic模式。

RabbitMQ快速入门

headers

应用场景比较少,不关心routingKey是否匹配,而是根据headers中的Key-Value来匹配。
只有headers中的所有键值对全部匹配时才会路由消息。

与SpringBoot集成

生产者

模拟商品新增时,利用消息队列来异步统计信息。

引入依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

通过Java代码的方式来配置

@Configuration
public class RabbitMQConfig {

	@Bean
	public ConnectionFactory connectionFactory(){
		CachingConnectionFactory factory = new CachingConnectionFactory();
		factory.setHost("192.168.1.120");
		factory.setPort(5672);
		factory.setUsername("root");
		factory.setPassword("root");
		factory.setVirtualHost("test");
		return factory;
	}

	//声明交换机
	@Bean
	public FanoutExchange fanoutExchange(){
		return new FanoutExchange("goodsExchange");
	}

	//声明队列
	@Bean
	public Queue queue(){
		return new Queue("goodsQueue", true);
	}

	//声明binding
	@Bean
	public Binding binding(Queue queue,FanoutExchange exchange){
		//将队列绑定到指定交换机上
		return BindingBuilder.bind(queue).to(exchange);
	}

	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		return template;
	}
}

商品新增成功后,发送消息,异步统计商品信息

@RestController
@RequestMapping("goods")
public class GoodsController {
	@Autowired
	private RabbitTemplate rabbitTemplate;

	@PostMapping("save")
	public Object save(String name) {
		//保存商品
		String id = UUID.randomUUID().toString(true);
		//写具体的业务逻辑....
		System.out.println("商品新增成功.");

		//发送消息 商品id
		rabbitTemplate.convertAndSend("goodsExchange", "", id);
		return id;
	}
}

消费者

消费者服务与生产者服务最好是分开,两者不应该耦合在一起,所以创建一个消费项目。

和生产者一样,引入必须的依赖,编写配置类。

消费类

@Component
public class GoodsConsumer {

	//监听的队列名称
	@RabbitListener(queues = "goodsQueue")
	public void get(String message) throws Exception{
		System.out.println("消息消费,新增商品统计id:"+message);
		//编写统计的业务逻辑...
	}
}

启动两个服务,发出商品新增请求,效果如图:

RabbitMQ快速入门
RabbitMQ快速入门

总结

介绍了一下RabbitMq的服务搭建以及简单使用,实际应用还需要考虑到很多问题,例如:消息丢失,消息重复消费等等,进阶使用笔者将另起篇幅,敬请期待!

相关标签: 后端