rabbitMQ入门
序:安装rabbitMQ
参考:https://blog.csdn.net/Yuwen_forJava/article/details/81661929
1. 什么是rabbitMQ
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。
RabbitMQ是MQ的一种。
2. rabbitMQ基础
2.1 rabbitMQ流程
2.2 producr生产者
消息生产者,就是投递消息的程序。向rabbitmq发送消息+指定的routing key
2.3 Exchange
exchange翻译是交换的意思。exchange属于rabbitmq的模块,它和queue绑定在一起,通过一个binding key ,binding key 允许相同。exchange接收来自product的消息,根据routing key、binding key以及exchange的种类进行消息的分发,把消息发往相应的queue。
exchange常见的种类有3种:fanout、direct、topic
fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。广播
direct:把消息投递到那些binding key与routing key完全匹配的队列中。精准匹配
topic:将消息路由到binding key与routing key模式匹配的队列中。模糊匹配
2.4 queue
rabbitmq内部模块,保存消息。
2.5 custom
消息消费者,处理消息。
3. rabbitMQ使用场景
应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
这种做法有一个缺点:当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱^ ^)
订单系统和库存系统高耦合.引入消息队列
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息,进行库操作。
就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失(马云这下高兴了).
流量削峰
流量削峰一般在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
4.案例
和spring进行了整合
创建两个maven web项目,分别用eclipse和idea运行,互相调用。
以下是idea运用的代码
项目目录:
项目依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<!-- spring版本号 -->
<spring.version>4.3.5.RELEASE</spring.version>
<!-- mybatis版本号 -->
<mybatis.version>3.4.1</mybatis.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- RabbitMQ -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- java ee -->
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- 实现slf4j接口并整合 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.2</version>
</dependency>
<!-- JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.7</version>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.41</version>
<scope>runtime</scope>
</dependency>
<!-- 数据库连接池 -->
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.5.2</version>
</dependency>
<!-- MyBatis -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${mybatis.version}</version>
</dependency>
<!-- mybatis/spring整合包 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.3.1</version>
</dependency>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- <dependency> <groupId>javax.servlet</groupId> <artifactId>jsp-api</artifactId>
<version>2.0</version> <scope>provided</scope> </dependency> -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>taglibs</groupId>
<artifactId>standard</artifactId>
<version>1.1.2</version>
</dependency>
<!--redis -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.6.1.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>1.8.13</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.aspectj/aspectjweaver -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.0</version>
</dependency>
<!-- Spring 整合Shiro需要的依赖 -->
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-web</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-ehcache</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-spring</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
创建配置product.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--配置connection-factory,指定连接rabbit server参数-->
<rabbit:connection-factory id="connectionFactory" username="root" password="root"
host="localhost"
port="5672"
virtual-host="/"/> <!-- virtual-host="/"是默认的虚拟机路径-->
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="queue_idea" durable="true" auto-delete="false"
exclusive="false" name="queue_idea">
</rabbit:queue>
<rabbit:direct-exchange name="exchange_insurance" durable="true" auto-delete="false" id="exchange_insurance">
<rabbit:bindings>
<rabbit:binding queue="queue_idea" key="testKeyIdea"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 客户端投递消息到exchange -->
<rabbit:template id="amqpTemplate" exchange="exchange_insurance" connection-factory="connectionFactory"/>
</beans>
创建配置custom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--配置connection-factory,指定连接rabbit server参数-->
<rabbit:connection-factory id="connectionFactory2" username="root" password="root"
host="localhost"
port="5672"
virtual-host="/"/> <!-- virtual-host="/"是默认的虚拟机路径-->
<rabbit:admin connection-factory="connectionFactory2"/>
<!-- queue 队列声明 -->
<!-- queue 队列声明 name 队里的额name 是关联生产表和消费表的为唯一线索 -->
<rabbit:queue id="queue_eclipse" name="queue_eclipse"></rabbit:queue>
<!-- 定义消费者监听器 -->
<!-- 创建一个bean实例,bean实例中声明处理请求的类 -->
<bean id="consumerLitener2" class="cn.yu.service.RabbitMQConsumer"></bean>
<rabbit:listener-container connection-factory="connectionFactory2" acknowledge="auto" concurrency="8">
<!-- queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 -->
<rabbit:listener queues="queue_eclipse" ref="consumerLitener2"/>
</rabbit:listener-container>
</beans>
创建applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
">
<!-- 扫描service包下所有使用注解的类型 -->
<context:component-scan base-package="cn.yu"/>
<import resource="classpath*:rabbitmq/product.xml"/>
<import resource="classpath*:rabbitmq/custom.xml"/>
</beans>
spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd">
<!-- 扫描web相关的bean -->
<context:component-scan base-package="cn.yu"/>
<!-- 开启SpringMVC注解模式 -->
<mvc:annotation-driven/>
<!-- 静态资源默认servlet配置 -->
<mvc:default-servlet-handler/>
<!-- 配置jsp 显示ViewResolver -->
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="viewClass" value="org.springframework.web.servlet.view.JstlView"/>
<property name="prefix" value="/WEB-INF/jsp/"/>
<property name="suffix" value=".jsp"/>
</bean>
</beans>
web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" version="3.1">
<display-name>ChatRobot</display-name>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<servlet>
<servlet-name>SpringMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-*.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
</servlet>
<servlet-mapping>
<servlet-name>SpringMVC</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<welcome-file-list>
<welcome-file>index.jsp</welcome-file>
</welcome-file-list>
</web-app>
index.jsp,两个index是一样的
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<body>
<h2>Hello RabbitMq!</h2>
<a href="/sendMsg"><button>点击这个,发送消息到eclipse</button></a>
</body>
</html>
RabbitmqController
@Resource
private RabbitMQProducer rabbitMQProducer;
@RequestMapping("sendMsg")
public String sendMsg(){
String msg = new String("开始发消息1号sfda");
System.out.println("开始发消息");
rabbitMQProducer.sendMessage(msg);
System.out.println("发出了消息");
return "index";
}
RabbitMQConsumer继承MessageListener,实现onMessage方法
@Service
public class RabbitMQConsumer implements MessageListener {
public void onMessage(Message message) {
System.out.println("消息!");
System.out.println(message);
System.out.println("消息getMessageProperties!");
System.out.println(message.getMessageProperties());
System.out.println("消息getBody");
try {
String ss = new String(message.getBody(), "UTF-8");
System.out.println(ss);
} catch (UnsupportedEncodingException e) {
}
// System.out.println(1/0);
}
}
RabbitMQProducer
@Service
public class RabbitMQProducer {
@Resource
private AmqpTemplate amqpTemplate;
public void sendMessage(Object message) {
amqpTemplate.convertAndSend("testKeyIdea",message);//testKey为配置文件中queue对应的key,指明发送给哪个queue。
}
}
eclipse和idea一样,只需要修改produc.xml和custom.xml中的一点配置即可,以下:
product.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--配置connection-factory,指定连接rabbit server参数-->
<rabbit:connection-factory id="connectionFactory" username="root" password="root"
host="localhost"
port="5672"
virtual-host="/"/> <!-- virtual-host="/"是默认的虚拟机路径-->
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="queue_eclipse" durable="true" auto-delete="false"
exclusive="false" name="queue_eclipse"><!--正常队列当中指向死信-->
<!--<rabbit:queue-arguments>-->
<!--<entry key="x-message-ttl"><!–设置超时–>-->
<!--<value type="java.lang.Long">30000</value>-->
<!--</entry>-->
<!--<entry key="x-dead-letter-exchange"><!–指定交换机–>-->
<!--<value type="java.lang.String">alter</value>-->
<!--</entry>-->
<!--</rabbit:queue-arguments>-->
</rabbit:queue>
<!--<rabbit:queue id="alter_queue" durable="true" auto-delete="false" exclusive="false" name="alter_queue"/><!–死信队列–>-->
<!--<rabbit:direct-exchange name="alter" durable="true" auto-delete="false" id="alter"><!–死信交换机–>-->
<!--<rabbit:bindings>-->
<!--<rabbit:binding queue="alter_queue" key="testKey"/>-->
<!--</rabbit:bindings>-->
<!--</rabbit:direct-exchange>-->
<rabbit:direct-exchange name="exchange_insurance" durable="true" auto-delete="false" id="exchange_insurance"><!--正常交换机-->
<rabbit:bindings>
<rabbit:binding queue="queue_eclipse" key="testKeyeEclipse"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 客户端投递消息到exchange -->
<rabbit:template id="amqpTemplate" exchange="exchange_insurance" connection-factory="connectionFactory"/>
</beans>
coustom.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--配置connection-factory,指定连接rabbit server参数-->
<rabbit:connection-factory id="connectionFactory2" username="root" password="root"
host="localhost"
port="5672"
virtual-host="/"/> <!-- virtual-host="/"是默认的虚拟机路径-->
<rabbit:admin connection-factory="connectionFactory2"/>
<!-- queue 队列声明 -->
<!-- queue 队列声明 name 队里的额name 是关联生产表和消费表的为唯一线索 -->
<rabbit:queue id="queue_idea" name="queue_idea">
</rabbit:queue>
<!-- 定义消费者监听器 -->
<!-- 创建一个bean实例,bean实例中声明处理请求的类 -->
<bean id="consumerLitener2" class="cn.yu.service.RabbitMQConsumer"></bean>
<rabbit:listener-container
connection-factory="connectionFactory2" acknowledge="auto" concurrency="8">
<!-- queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 -->
<rabbit:listener queues="queue_idea" ref="consumerLitener2"/>
</rabbit:listener-container>
</beans>
以上,为rabbitmq入门。纯属个人自己理解,学习路径来源百度,若有错误,欢迎指正,随时修改。
上一篇: 剑指offer编程题求第n个丑数java实现(穷举,队列)
下一篇: RabbitMQ入门