欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rabbitMQ入门

程序员文章站 2024-03-18 11:04:34
...

序:安装rabbitMQ

参考:https://blog.csdn.net/Yuwen_forJava/article/details/81661929

1. 什么是rabbitMQ

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。

RabbitMQ是MQ的一种。

 

2. rabbitMQ基础

2.1 rabbitMQ流程

rabbitMQ入门

 2.2 producr生产者

消息生产者,就是投递消息的程序。向rabbitmq发送消息+指定的routing key

2.3 Exchange

exchange翻译是交换的意思。exchange属于rabbitmq的模块,它和queue绑定在一起,通过一个binding key ,binding key 允许相同。exchange接收来自product的消息,根据routing key、binding key以及exchange的种类进行消息的分发,把消息发往相应的queue。

exchange常见的种类有3种:fanout、direct、topic

fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。广播

direct:把消息投递到那些binding key与routing key完全匹配的队列中。精准匹配

topic:将消息路由到binding key与routing key模式匹配的队列中。模糊匹配

2.4 queue

rabbitmq内部模块,保存消息。

2.5 custom

消息消费者,处理消息。

 

3. rabbitMQ使用场景

应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

这种做法有一个缺点:当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱^ ^)

订单系统和库存系统高耦合.引入消息队列

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,获取下单消息,进行库操作。

就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失(马云这下高兴了).

流量削峰

流量削峰一般在秒杀活动中应用广泛

场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用:

1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)

2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

 

4.案例

和spring进行了整合

创建两个maven web项目,分别用eclipse和idea运行,互相调用。

以下是idea运用的代码

项目目录:

rabbitMQ入门

项目依赖

 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <!-- spring版本号 -->
        <spring.version>4.3.5.RELEASE</spring.version>
        <!-- mybatis版本号 -->
        <mybatis.version>3.4.1</mybatis.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <!-- RabbitMQ -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <!-- java ee -->
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
        </dependency>

        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!-- 实现slf4j接口并整合 -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.2</version>
        </dependency>

        <!-- JSON -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.8.7</version>
        </dependency>


        <!-- 数据库 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.41</version>
            <scope>runtime</scope>
        </dependency>

        <!-- 数据库连接池 -->
        <dependency>
            <groupId>com.mchange</groupId>
            <artifactId>c3p0</artifactId>
            <version>0.9.5.2</version>
        </dependency>

        <!-- MyBatis -->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>${mybatis.version}</version>
        </dependency>

        <!-- mybatis/spring整合包 -->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-spring</artifactId>
            <version>1.3.1</version>
        </dependency>

        <!-- Spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <!-- <dependency> <groupId>javax.servlet</groupId> <artifactId>jsp-api</artifactId>
            <version>2.0</version> <scope>provided</scope> </dependency> -->


        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
            <version>1.2</version>
        </dependency>

        <dependency>
            <groupId>taglibs</groupId>
            <artifactId>standard</artifactId>
            <version>1.1.2</version>
        </dependency>

        <!--redis -->
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
            <version>1.6.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.7.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>1.8.13</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.aspectj/aspectjweaver -->
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.9.0</version>
        </dependency>

        <!-- Spring 整合Shiro需要的依赖 -->
        <dependency>
            <groupId>org.apache.shiro</groupId>
            <artifactId>shiro-core</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.shiro</groupId>
            <artifactId>shiro-web</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.shiro</groupId>
            <artifactId>shiro-ehcache</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.shiro</groupId>
            <artifactId>shiro-spring</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

    </dependencies>

创建配置product.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!--配置connection-factory,指定连接rabbit server参数-->
    <rabbit:connection-factory id="connectionFactory" username="root" password="root"
                               host="localhost"
                               port="5672"
                               virtual-host="/"/> <!-- virtual-host="/"是默认的虚拟机路径-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:queue id="queue_idea" durable="true" auto-delete="false"
                  exclusive="false" name="queue_idea">
    </rabbit:queue>
    
    <rabbit:direct-exchange name="exchange_insurance" durable="true" auto-delete="false" id="exchange_insurance">
        <rabbit:bindings>
            <rabbit:binding queue="queue_idea" key="testKeyIdea"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 客户端投递消息到exchange -->
    <rabbit:template id="amqpTemplate" exchange="exchange_insurance" connection-factory="connectionFactory"/>
</beans>

创建配置custom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!--配置connection-factory,指定连接rabbit server参数-->
    <rabbit:connection-factory id="connectionFactory2" username="root" password="root"
                               host="localhost"
                               port="5672"
                               virtual-host="/"/> <!-- virtual-host="/"是默认的虚拟机路径-->
    <rabbit:admin connection-factory="connectionFactory2"/>

    <!-- queue 队列声明 -->
    <!-- queue 队列声明  name 队里的额name 是关联生产表和消费表的为唯一线索  -->
    <rabbit:queue id="queue_eclipse" name="queue_eclipse"></rabbit:queue>

    <!-- 定义消费者监听器 -->
    <!-- 创建一个bean实例,bean实例中声明处理请求的类 -->
    <bean id="consumerLitener2" class="cn.yu.service.RabbitMQConsumer"></bean>

    <rabbit:listener-container  connection-factory="connectionFactory2" acknowledge="auto" concurrency="8">
        <!-- queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 -->
        <rabbit:listener  queues="queue_eclipse" ref="consumerLitener2"/>
    </rabbit:listener-container>
</beans>

 

创建applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
	http://www.springframework.org/schema/aop
	http://www.springframework.org/schema/aop/spring-aop-3.2.xsd

	http://www.springframework.org/schema/context
	http://www.springframework.org/schema/context/spring-context-3.2.xsd
	http://www.springframework.org/schema/tx
	http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
	">

    <!-- 扫描service包下所有使用注解的类型 -->
    <context:component-scan base-package="cn.yu"/>

    <import resource="classpath*:rabbitmq/product.xml"/>
    <import resource="classpath*:rabbitmq/custom.xml"/>

</beans>

spring-mvc.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/mvc
       http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd">

    <!-- 扫描web相关的bean -->
    <context:component-scan base-package="cn.yu"/>

    <!-- 开启SpringMVC注解模式 -->
    <mvc:annotation-driven/>

    <!-- 静态资源默认servlet配置 -->
    <mvc:default-servlet-handler/>

    <!-- 配置jsp 显示ViewResolver -->
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
        <property name="viewClass" value="org.springframework.web.servlet.view.JstlView"/>
        <property name="prefix" value="/WEB-INF/jsp/"/>
        <property name="suffix" value=".jsp"/>
    </bean>

</beans>

web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" version="3.1">
  <display-name>ChatRobot</display-name>
  <filter>
    <filter-name>encodingFilter</filter-name>
    <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
    <init-param>
      <param-name>encoding</param-name>
      <param-value>UTF-8</param-value>
    </init-param>
  </filter>
  <filter-mapping>
    <filter-name>encodingFilter</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath:applicationContext.xml</param-value>
  </context-param>
  <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
  <servlet>
    <servlet-name>SpringMVC</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:spring-*.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
    <async-supported>true</async-supported>
  </servlet>
  <servlet-mapping>
    <servlet-name>SpringMVC</servlet-name>
    <url-pattern>/</url-pattern>
  </servlet-mapping>
  <welcome-file-list>
    <welcome-file>index.jsp</welcome-file>
  </welcome-file-list>
</web-app>

index.jsp,两个index是一样的

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<body>
<h2>Hello RabbitMq!</h2>
<a href="/sendMsg"><button>点击这个,发送消息到eclipse</button></a>
</body>
</html>
RabbitmqController
  @Resource
    private RabbitMQProducer rabbitMQProducer;

    @RequestMapping("sendMsg")
    public String sendMsg(){
        String msg = new String("开始发消息1号sfda");
        System.out.println("开始发消息");
        rabbitMQProducer.sendMessage(msg);
        System.out.println("发出了消息");
        return "index";
    }
RabbitMQConsumer继承MessageListener,实现onMessage方法
@Service
public class RabbitMQConsumer implements MessageListener {
    public void onMessage(Message message) {
        System.out.println("消息!");
        System.out.println(message);
        System.out.println("消息getMessageProperties!");
        System.out.println(message.getMessageProperties());
        System.out.println("消息getBody");
        try {
            String ss = new String(message.getBody(), "UTF-8");
            System.out.println(ss);
        } catch (UnsupportedEncodingException e) {
        }
//        System.out.println(1/0);
    }
}

RabbitMQProducer 

@Service
public class RabbitMQProducer {

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Object message) {
        amqpTemplate.convertAndSend("testKeyIdea",message);//testKey为配置文件中queue对应的key,指明发送给哪个queue。
    }
}

eclipse和idea一样,只需要修改produc.xml和custom.xml中的一点配置即可,以下:

product.xml

​

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!--配置connection-factory,指定连接rabbit server参数-->
    <rabbit:connection-factory id="connectionFactory" username="root" password="root"
                               host="localhost"
                               port="5672"
                               virtual-host="/"/> <!-- virtual-host="/"是默认的虚拟机路径-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:queue id="queue_eclipse" durable="true" auto-delete="false"
                  exclusive="false" name="queue_eclipse"><!--正常队列当中指向死信-->
        <!--<rabbit:queue-arguments>-->
            <!--<entry key="x-message-ttl">&lt;!&ndash;设置超时&ndash;&gt;-->
                <!--<value type="java.lang.Long">30000</value>-->
            <!--</entry>-->
            <!--<entry key="x-dead-letter-exchange">&lt;!&ndash;指定交换机&ndash;&gt;-->
                <!--<value type="java.lang.String">alter</value>-->
            <!--</entry>-->
        <!--</rabbit:queue-arguments>-->
    </rabbit:queue>
    <!--<rabbit:queue id="alter_queue" durable="true" auto-delete="false" exclusive="false" name="alter_queue"/>&lt;!&ndash;死信队列&ndash;&gt;-->
    <!--<rabbit:direct-exchange name="alter" durable="true" auto-delete="false" id="alter">&lt;!&ndash;死信交换机&ndash;&gt;-->
        <!--<rabbit:bindings>-->
            <!--<rabbit:binding queue="alter_queue" key="testKey"/>-->
        <!--</rabbit:bindings>-->
    <!--</rabbit:direct-exchange>-->
    <rabbit:direct-exchange name="exchange_insurance" durable="true" auto-delete="false" id="exchange_insurance"><!--正常交换机-->
        <rabbit:bindings>
            <rabbit:binding queue="queue_eclipse" key="testKeyeEclipse"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 客户端投递消息到exchange -->
    <rabbit:template id="amqpTemplate" exchange="exchange_insurance" connection-factory="connectionFactory"/>
</beans>


​

coustom.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit-1.5.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!--配置connection-factory,指定连接rabbit server参数-->
    <rabbit:connection-factory id="connectionFactory2" username="root" password="root"
                               host="localhost"
                               port="5672"
                               virtual-host="/"/> <!-- virtual-host="/"是默认的虚拟机路径-->
    <rabbit:admin connection-factory="connectionFactory2"/>

    <!-- queue 队列声明 -->
    <!-- queue 队列声明  name 队里的额name 是关联生产表和消费表的为唯一线索  -->
    <rabbit:queue id="queue_idea" name="queue_idea">
    </rabbit:queue>

    <!-- 定义消费者监听器 -->
    <!-- 创建一个bean实例,bean实例中声明处理请求的类 -->
    <bean id="consumerLitener2" class="cn.yu.service.RabbitMQConsumer"></bean>

    <rabbit:listener-container
            connection-factory="connectionFactory2" acknowledge="auto" concurrency="8">
        <!-- queues属性从那个队列中接收消息,ref属性是当存在消息是使用哪个类去处理 -->
        <rabbit:listener queues="queue_idea" ref="consumerLitener2"/>
    </rabbit:listener-container>
</beans>

 

 


 

 

以上,为rabbitmq入门。纯属个人自己理解,学习路径来源百度,若有错误,欢迎指正,随时修改。