欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

程序员文章站 2022-04-15 09:07:06
前言 1. AMQP 核心组件 RabbitAdmin SpringAMQP声明 RabbitTemplate SimpleMessageListenerContainer MessageListenerAdapter MessageConverter 2. RabbitAdmin RabbitAd ......

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

前言

1. amqp 核心组件

  • rabbitadmin
  • springamqp声明
  • rabbittemplate
  • simplemessagelistenercontainer
  • messagelisteneradapter
  • messageconverter

2. rabbitadmin

rabbitadmin类可以很好的才注意rabbitmq,在spring中直接进行诸如即可。

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

注意:

  • autostartup必须要设置为true,否则spring容器不会加载rabbitadmin类
  • rabbitadmin底层实现就是从spring容器中获取exchange、bingding、routingkey以及queue的@bean声明
  • 使用rabbittemplate的execute方法执行对应的什么、修改、删除等一系列rabbitmq基础功能操作
  • 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等

2.1 代码演示

2.1.1 引入pom文件

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
    xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelversion>4.0.0</modelversion>

    <groupid>com.cp</groupid>
    <artifactid>rabbitmq-spring</artifactid>
    <version>0.0.1-snapshot</version>
    <packaging>jar</packaging>

    <name>rabbitmq-spring</name>
    <description>rabbitmq-spring</description>

    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>1.5.14.release</version>
        <relativepath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceencoding>utf-8</project.build.sourceencoding>
        <project.reporting.outputencoding>utf-8</project.reporting.outputencoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter</artifactid>
        </dependency>

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupid>com.rabbitmq</groupid>
            <artifactid>amqp-client</artifactid>
            <version>3.6.5</version>
        </dependency>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-amqp</artifactid>
        </dependency>       
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
    </build>


</project>

2.1.2 配置bean

@configuration
@componentscan({"com.cp.spring.*"})
public class rabbitmqconfig {

    //相当于<bean id="connectionfactory"></bean>
    @bean
    public connectionfactory connectionfactory(){
        cachingconnectionfactory connectionfactory = new cachingconnectionfactory();
        connectionfactory.setaddresses("127.0.0.1:5672");
        connectionfactory.setusername("user_cp");
        connectionfactory.setpassword("123456");
        connectionfactory.setvirtualhost("/vhost_cp");
        return connectionfactory;
    }
    
    //形参名称要与bean的方法名保持一致
    @bean
    public rabbitadmin rabbitadmin(connectionfactory connectionfactory) {
        rabbitadmin rabbitadmin = new rabbitadmin(connectionfactory);
        rabbitadmin.setautostartup(true);
        return rabbitadmin;
    }
}
    

2.1.3 测试类

@runwith(springrunner.class)
@springboottest
public class applicationtests {

    @test
    public void contextloads() {
    }
    
    @autowired
    private rabbitadmin rabbitadmin;
    
    @test
    public void testadmin() throws exception {
        //直连监听
        rabbitadmin.declareexchange(new directexchange("test.direct", false, false));
        
        rabbitadmin.declareexchange(new topicexchange("test.topic", false, false));
        
        rabbitadmin.declareexchange(new fanoutexchange("test.fanout", false, false));
        
        rabbitadmin.declarequeue(new queue("test.direct.queue", false));
        
        rabbitadmin.declarequeue(new queue("test.topic.queue", false));
        
        rabbitadmin.declarequeue(new queue("test.fanout.queue", false));

        //第一个参数:具体的队列 第二个参数:绑定的类型 第三个参数:交换机 第四个参数:路由key 第五个参数:arguments 参数
        rabbitadmin.declarebinding(new binding("test.direct.queue",
                binding.destinationtype.queue,
                "test.direct", "direct", new hashmap<>()));

        //bindingbuilder 链式编程
        rabbitadmin.declarebinding(
                bindingbuilder
                .bind(new queue("test.topic.queue", false))     //直接创建队列
                .to(new topicexchange("test.topic", false, false))  //直接创建交换机 建立关联关系
                .with("user.#"));   //指定路由key
        
        
        rabbitadmin.declarebinding(
                bindingbuilder
                .bind(new queue("test.fanout.queue", false))        
                .to(new fanoutexchange("test.fanout", false, false)));
        
        //清空队列数据
        rabbitadmin.purgequeue("test.topic.queue", false);
    }
}

通过以上代码,可以自行测试一下结果。

rabbitadmin源码

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

实现了initializingbean接口,表明在bean配置加载完后再加载rabbitadmin配置。找到afterpropertiesset()方法中最要的initialize()初始化方法。

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

this.applicationcontext.getbeansoftype(collection.class, false, false).values()

可以看到exchange、queue、binding都是从spring容器中获取三种类型,加载到上方定义的contextexchanges、contextqueues、contextbindings三种容器中。
后续的源码中,也可以看出通过筛选spring容器中rabbitmq的信息之后,再去建立rabbitmq服务器的连接。主要通过spring以@bean的方式,将配置加载到spring容器之后,再从容器中获取相关信息,再去建立连接。

3. springamqp声明

  • 在rabbit基础api里面声明一个exchange、声明一个绑定、一个队列

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

-使用springamqp去声明,就需要使用springamqp的如下模式,即声明@bean方式

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

3.1 代码演示

@configuration
@componentscan({"com.cp.spring.*"})
public class rabbitmqconfig {

    //相当于<bean id="connectionfactory"></bean>
    @bean
    public connectionfactory connectionfactory(){
        cachingconnectionfactory connectionfactory = new cachingconnectionfactory();
        connectionfactory.setaddresses("127.0.0.1:5672");
        connectionfactory.setusername("user_cp");
        connectionfactory.setpassword("123456");
        connectionfactory.setvirtualhost("/vhost_cp");
        return connectionfactory;
    }
    
    //形参名称要与bean的方法名保持一致
    @bean
    public rabbitadmin rabbitadmin(connectionfactory connectionfactory) {
        rabbitadmin rabbitadmin = new rabbitadmin(connectionfactory);
        rabbitadmin.setautostartup(true);
        return rabbitadmin;
    }
    
    /**  
     * 针对消费者配置  
     * 1. 设置交换机类型  
     * 2. 将队列绑定到交换机  
        fanoutexchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        headersexchange :通过添加属性key-value匹配  
        directexchange:按照routingkey分发到指定队列  
        topicexchange:多关键字匹配  
     */  
    @bean  
    public topicexchange exchange001() {  
        return new topicexchange("topic001", true, false);  
    }  

    @bean  
    public queue queue001() {  
        return new queue("queue001", true); //队列持久  
    }  
    
    @bean  
    public binding binding001() {  
        return bindingbuilder.bind(queue001()).to(exchange001()).with("spring.*");  
    }  
    
    @bean  
    public topicexchange exchange002() {  
        return new topicexchange("topic002", true, false);  
    }  
    
    @bean  
    public queue queue002() {  
        return new queue("queue002", true); //队列持久  
    }
    
    @bean  
    public binding binding002() {  
        return bindingbuilder.bind(queue002()).to(exchange002()).with("rabbit.*");  
    } 
    
    @bean  
    public queue queue003() {  
        return new queue("queue003", true); //队列持久  
    }
    
    @bean  
    public binding binding003() {  
        //同一个exchange绑定了2个队列
        return bindingbuilder.bind(queue003()).to(exchange001()).with("mq.*");  
    } 
}

再次运行applicationtests类中testadmin()方法,可以在控制台中,查看到一个exchange绑定两个queue。

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

4. rabbittemplate

rabbittemplate,即消息模板

  • 我们在与springamqp整合的时候进行发送消息的关键词
  • 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口confirmcallback、返回值确认接口returncallback等等。同样我们需要进行注入到spring容器中,然后直接使用

  • 在与spring整合时需要实例化,但是在与springboot整合时,在配置文件里添加配置即可

4.1 代码演示

4.1.1 rabbitmqconfig类

在rabbitmqconfig类中写rabbittemplate配置

@bean
public rabbittemplate rabbittemplate(connectionfactory connectionfactory) {
    rabbittemplate rabbittemplate = new rabbittemplate(connectionfactory);
    return rabbittemplate;
}

4.1.2 applicationtests类

在applicationtests测试类中添加测试方法,进行测试。

@autowired
    private rabbittemplate rabbittemplate;


    @test
    public void testsendmessage() throws exception {
        //1 创建消息
        messageproperties messageproperties = new messageproperties();
        messageproperties.getheaders().put("desc", "信息描述..");
        messageproperties.getheaders().put("type", "自定义消息类型..");
        //消息体,与参数
        message message = new message("hello rabbitmq".getbytes(), messageproperties);
        //转换并发送
        //messagepostprocessor 在消息发送完毕后再做一次转换进行再加工,匿名接口,需要重写方法
        rabbittemplate.convertandsend("topic001", "spring.amqp", message, new messagepostprocessor() {
            @override
            public message postprocessmessage(message message) throws amqpexception {
                system.err.println("------添加额外的设置---------");
                message.getmessageproperties().getheaders().put("desc", "额外修改的信息描述");
                message.getmessageproperties().getheaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    }

4.1.3 查看管控台

运行前,可以看到queue001中是没有消息的。

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

运行testsendmessage()方法。并获取消息。

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

4.1.4 简单写法

@test
public void testsendmessage2() throws exception {
    //1 创建消息
    messageproperties messageproperties = new messageproperties();
    messageproperties.setcontenttype("text/plain");
    message message = new message("mq 消息1234".getbytes(), messageproperties);

    rabbittemplate.send("topic001", "spring.abc", message);

    rabbittemplate.convertandsend("topic001", "spring.amqp", "hello object message send!");
    rabbittemplate.convertandsend("topic002", "rabbit.abc", "hello object message send!");
}

我们往topic001中发送了两条消息,topic002中发送了一条消息。运行testsendmessage2() 接下来再查看下管控台

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

可以看到topic001中已经有了三条消息,刚才发送的消息也还在。getmessage并不是消费消息,而只是获取消息。

5. simplemessagelistenercontainer

简单消息监听容器

  • 这个类非常的强大,我们可以对它进行很多设置,对于消费者的配置项,这个类都可以满足
  • 监听队列(多个队列)、自动启动、自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务容器(并发)、是否开启事务、回滚消息等
  • 设置消费者数量、最小最大数量、批量消费
  • 设置消息确认和自动确认模式、是否重回队列、异常捕捉handler函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等等。

注意:

  • simplemessagelistenercontainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等
  • 很多机遇rabbitmq的自制定话后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出springamqp非常的强大

思考

simplemessagelistenercontainer为什么可以动态感知配置变更?

5.1 代码演示

5.1.1 rabbitmqconfig类

配置中添加如下代码:

@bean
public simplemessagelistenercontainer messagecontainer(connectionfactory connectionfactory) {
    
    simplemessagelistenercontainer container = new simplemessagelistenercontainer(connectionfactory);
    //添加多个队列进行监听
    container.setqueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
    //当前消费者数量
    container.setconcurrentconsumers(1);
    //最大消费者数量
    container.setmaxconcurrentconsumers(5);
    //设置重回队列,一般设置false
    container.setdefaultrequeuerejected(false);
    //设置自动签收机制
    container.setacknowledgemode(acknowledgemode.auto);
    //设置listener外露
    container.setexposelistenerchannel(true);
    //消费端标签生成策略
    container.setconsumertagstrategy(new consumertagstrategy() {
        @override
        public string createconsumertag(string queue) {
            //每个消费端都有自己独立的标签
            return queue + "_" + uuid.randomuuid().tostring();
        }
    });

    //消息监听
    container.setmessagelistener(new channelawaremessagelistener() {
        @override
        public void onmessage(message message, channel channel) throws exception {
            string msg = new string(message.getbody());
            system.err.println("----------消费者: " + msg);
        }
    });
    return container;
}

运行之前写的testsendmessage2()方法,查看管控台中的相关信息以及控制台打印信息

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

6. messagelisteneradapter

messagelisteneradapter 即消息监听适配器

6.1 代码演示

6.1.1 适配器使用方式1

我们把之前的消息监听代码注释,可以不用直接加消息监听,而是采用messagelisteneradapter的方式,通过适配器方式1,我们来学习下如何使用默认的handlemessage,自定义方法名,自定义转换器。

使用默认handlemessage

//消息监听
/*container.setmessagelistener(new channelawaremessagelistener() {
    @override
    public void onmessage(message message, channel channel) throws exception {
        string msg = new string(message.getbody());
        system.err.println("----------消费者: " + msg);
    }
});*/


messagelisteneradapter adapter = new messagelisteneradapter(new messagedelegate());
container.setmessagelistener(adapter);

messagelisteneradapter 适配器类,熟悉适配器模式的朋友肯定了解适配器模式的话,可以通过适配器,适配自己的实现,这里我们适配自定义的messagedelegate类。我们就可以不采用监听的方式,采用适配的方式。

自定义messagedelegate

public class messagedelegate {

    public void handlemessage(byte[] messagebody) {
        system.err.println("默认方法, 消息内容:" + new string(messagebody));
    }
}

messagedelegate类中,方法名与参数handlemessage(byte[] messagebody)是固定的。为什么呢?

messagelisteneradapter源码分析

我们来看下messagelisteneradapter底层代码

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

messagelisteneradapter类中

public static final string original_default_listener_method = "handlemessage";

默认方法名就是叫handlemessage。当然也可以自己去指定设置。通过messagelisteneradapter的代码我们可以看出如下核心属性

  • defaultlistenermethod默认监听方法名称:用于设置监听方法名称
  • delegate 委托对象:实际真实的委托对象,用于处理消息
  • queueortagtomethodname 队列标识与方法名称组成集合
  • 可以一一进行队列与方法名称的匹配
  • 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理

测试一下默认使用的handlemessage方法。启动applicationtests类,运行testsendmessage()测试方法。

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

自定义方法名

messagelisteneradapter adapter = new messagelisteneradapter(new messagedelegate());
adapter.setdefaultlistenermethod("consumemessage");
container.setmessagelistener(adapter);

修改messagedelegate()类

public class messagedelegate {
    
    public void consumemessage(byte[] messagebody) {
        system.err.println("字节数组方法, 消息内容:" + new string(messagebody));
    }
}

自定义textmessageconverter转换器

public class textmessageconverter implements messageconverter {

    @override
    public message tomessage(object object, messageproperties messageproperties) throws messageconversionexception {
        return new message(object.tostring().getbytes(), messageproperties);
    }

    @override
    public object frommessage(message message) throws messageconversionexception {
        string contenttype = message.getmessageproperties().getcontenttype();
        if(null != contenttype && contenttype.contains("text")) {
            return new string(message.getbody());
        }
        return message.getbody();
    }

}

修改rabbitmqconfig类

messagelisteneradapter adapter = new messagelisteneradapter(new messagedelegate());
adapter.setdefaultlistenermethod("consumemessage");
adapter.setmessageconverter(new textmessageconverter());
container.setmessagelistener(adapter);

修改messagedelegate类

public class messagedelegate {

    public void consumemessage(string messagebody) {
        system.err.println("字符串方法, 消息内容:" + messagebody);
    }
}

运行testsendmessage4text()测试方法

@test
public void testsendmessage2() throws exception {
    //1 创建消息
    messageproperties messageproperties = new messageproperties();
    messageproperties.setcontenttype("text/plain");
    message message = new message("mq 消息1234".getbytes(), messageproperties);
    rabbittemplate.send("topic001", "spring.abc", message);
    rabbittemplate.convertandsend("topic001", "spring.amqp", "hello object message send!");
    rabbittemplate.convertandsend("topic002", "rabbit.abc", "hello object message send!");
}

注意:在发消息的时候,必须符合自己的转换器。

打印结果
消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

6.1.2 适配器使用方式2

自定义队列名称和方法名称。

/**
* 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
* /
messagelisteneradapter adapter = new messagelisteneradapter(new messagedelegate());
adapter.setmessageconverter(new textmessageconverter());
map<string, string> queueortagtomethodname = new hashmap<>();
queueortagtomethodname.put("queue001", "method1");
queueortagtomethodname.put("queue002", "method2");
adapter.setqueueortagtomethodname(queueortagtomethodname);
container.setmessagelistener(adapter);    
public class messagedelegate {
    public void method1(string messagebody) {
        system.err.println("method1 收到消息内容:" + new string(messagebody));
    }
    
    public void method2(string messagebody) {
        system.err.println("method2 收到消息内容:" + new string(messagebody));
    }
    
}

运行 测试方法

@test
public void testsendmessage4text() throws exception {
    //1 创建消息
    messageproperties messageproperties = new messageproperties();
    messageproperties.setcontenttype("text/plain");
    message message = new message("mq 消息1234".getbytes(), messageproperties);
    rabbittemplate.send("topic001", "spring.abc", message);
    rabbittemplate.send("topic002", "rabbit.abc", message);
}

运行结果:

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

7. messageconverter消息转换器

我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到messageconverter

  • 自定义常用转换器:messageconverter,一般来讲都需要实现这个接口
  • 重写下面两个方法:
    tomessage:java对象转换为message
    frommessage:message对象转换为java对象
  • json转换器:jackson2jsonmessageconverter:可以进行java对象的转换功能
  • defaultjackson2javatypemapper映射器:可以进行java对象的映射关系
  • 自定义二进制转换器:比如图片类型、pdf、ppt、流媒体

7.1 代码演示

其实我们在介绍messagelisteneradapter的时候,中间就介绍到了textmessageconverter转换器,将二进制数据转换成字符串数据。

7.1.1 添加json格式的转换器

修改rabbitmqconfig类

// 1.1 支持json格式的转换器
messagelisteneradapter adapter = new messagelisteneradapter(new messagedelegate());
adapter.setdefaultlistenermethod("consumemessage");
//重点,加入json格式的转换器 json对应map对象
jackson2jsonmessageconverter jackson2jsonmessageconverter = new jackson2jsonmessageconverter();
adapter.setmessageconverter(jackson2jsonmessageconverter);

container.setmessagelistener(adapter);

修改messagedelegate

public class messagedelegate {
    //json对应map对象
    public void consumemessage(map messagebody) {
        system.err.println("map方法, 消息内容:" + messagebody);
    }
}

定义一个order对象

public class order {
    private string id;
    private string name;
    private string content;
    ...省略get/set等方法
}

定义测试方法

@test
public void testsendjsonmessage() throws exception {

    order order = new order();
    order.setid("001");
    order.setname("消息订单");
    order.setcontent("描述信息");
    objectmapper mapper = new objectmapper();
    string json = mapper.writevalueasstring(order);
    system.err.println("order 4 json: " + json);

    messageproperties messageproperties = new messageproperties();
    //这里注意一定要修改contenttype为 application/json
    messageproperties.setcontenttype("application/json");
    message message = new message(json.getbytes(), messageproperties);

    rabbittemplate.send("topic001", "spring.order", message);
}
  

打印结果:
消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

7.1.2 添加支持java对象转换

修改rabbitmqconfig类

messagelisteneradapter adapter = new messagelisteneradapter(new messagedelegate());
adapter.setdefaultlistenermethod("consumemessage");

jackson2jsonmessageconverter jackson2jsonmessageconverter = new jackson2jsonmessageconverter();

//需要将javatypemapper放入到jackson2jsonmessageconverter对象中
defaultjackson2javatypemapper javatypemapper = new defaultjackson2javatypemapper();
jackson2jsonmessageconverter.setjavatypemapper(javatypemapper);

adapter.setmessageconverter(jackson2jsonmessageconverter);
container.setmessagelistener(adapter);

修改messagedelegate

public class messagedelegate {
    public void consumemessage(order order) {
        system.err.println("order对象, 消息内容, id: " + order.getid() + 
                ", name: " + order.getname() + 
                ", content: "+ order.getcontent());
    }
}

定义测试方法

@test
public void testsendjavamessage() throws exception {

    order order = new order();
    order.setid("001");
    order.setname("订单消息");
    order.setcontent("订单描述信息");
    objectmapper mapper = new objectmapper();
    string json = mapper.writevalueasstring(order);
    system.err.println("order 4 json: " + json);

    messageproperties messageproperties = new messageproperties();
    //这里注意一定要修改contenttype为 application/json
    messageproperties.setcontenttype("application/json");
    //添加typeid 与类的全路径
    messageproperties.getheaders().put("__typeid__", "com.cp.spring.entity.order");
    message message = new message(json.getbytes(), messageproperties);

    rabbittemplate.send("topic001", "spring.order", message);
}

打印结果:

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

7.1.3 添加支持java对象多映射转换

修改rabbitmqconfig类

//1.3 defaultjackson2javatypemapper & jackson2jsonmessageconverter 支持java对象多映射转换
messagelisteneradapter adapter = new messagelisteneradapter(new messagedelegate());
adapter.setdefaultlistenermethod("consumemessage");
jackson2jsonmessageconverter jackson2jsonmessageconverter = new jackson2jsonmessageconverter();
defaultjackson2javatypemapper javatypemapper = new defaultjackson2javatypemapper();

//key表示标签 对应一个类的具体全路径。类和标签绑定之后,标签是order,意思就是转换成order类
map<string, class<?>> idclassmapping = new hashmap<string, class<?>>();
idclassmapping.put("order", com.cp.spring.entity.order.class);
idclassmapping.put("packaged", com.cp.spring.entity.packaged.class);

javatypemapper.setidclassmapping(idclassmapping);
//一层套一层
jackson2jsonmessageconverter.setjavatypemapper(javatypemapper);
adapter.setmessageconverter(jackson2jsonmessageconverter);
container.setmessagelistener(adapter);

修改messagedelegate

public class messagedelegate {
    //json对应map对象
    public void consumemessage(order order) {
        system.err.println("order对象, 消息内容, id: " + order.getid() + 
                ", name: " + order.getname() + 
                ", content: "+ order.getcontent());
    }
     
    public void consumemessage(packaged pack) {
        system.err.println("package对象, 消息内容, id: " + pack.getid() + 
                ", name: " + pack.getname() + 
                ", content: "+ pack.getdescription());
    }
}

定义一个packaged对象

public class packaged {
    private string id;
    private string name;
    private string description;
    ...省略get/set等方法
}

定义测试方法

@test
public void testsendmappingmessage() throws exception {

    objectmapper mapper = new objectmapper();

    order order = new order();
    order.setid("001");
    order.setname("订单消息");
    order.setcontent("订单描述信息");

    string json1 = mapper.writevalueasstring(order);
    system.err.println("order 4 json: " + json1);

    messageproperties messageproperties1 = new messageproperties();
    //这里注意一定要修改contenttype为 application/json
    messageproperties1.setcontenttype("application/json");
    //设置的是标签,而不是全路径
    messageproperties1.getheaders().put("__typeid__", "order");
    message message1 = new message(json1.getbytes(), messageproperties1);
    rabbittemplate.send("topic001", "spring.order", message1);

    packaged pack = new packaged();
    pack.setid("002");
    pack.setname("包裹消息");
    pack.setdescription("包裹描述信息");

    string json2 = mapper.writevalueasstring(pack);
    system.err.println("pack 4 json: " + json2);

    messageproperties messageproperties2 = new messageproperties();
    //这里注意一定要修改contenttype为 application/json
    messageproperties2.setcontenttype("application/json");
    //设置的是标签,而不是全路径
    messageproperties2.getheaders().put("__typeid__", "packaged");
    message message2 = new message(json2.getbytes(), messageproperties2);
    rabbittemplate.send("topic001", "spring.pack", message2);
}

打印结果:

消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

在通过单元测试运行testsendmappingmessage()方法时会存在一个问题:委派对象messagedelegate可能会收不到对象。
因为单元测试spring容器在运行完毕之后就停止,不会等到消费者消费完消息之后再停止,所以需要通过正常启动springboot项目,可以看到正常消费消息。

7.1.4 添加全局转换器

修改rabbitmqconfig类

@bean  
public queue queue_image() {  
    return new queue("image_queue", true); //队列持久  
}

@bean  
public queue queue_pdf() {  
    return new queue("pdf_queue", true); //队列持久  
}

//1.4 ext convert
messagelisteneradapter adapter = new messagelisteneradapter(new messagedelegate());
adapter.setdefaultlistenermethod("consumemessage");

//全局的转换器:所有小的converter都可以放到这个大的converter中
contenttypedelegatingmessageconverter convert = new contenttypedelegatingmessageconverter();

textmessageconverter textconvert = new textmessageconverter();
//text走文本转换器
convert.adddelegate("text", textconvert);
convert.adddelegate("html/text", textconvert);
convert.adddelegate("xml/text", textconvert);
convert.adddelegate("text/plain", textconvert);
//json走json转换器
jackson2jsonmessageconverter jsonconvert = new jackson2jsonmessageconverter();
convert.adddelegate("json", jsonconvert);
convert.adddelegate("application/json", jsonconvert);
//图片走图片转换器
imagemessageconverter imageconverter = new imagemessageconverter();
convert.adddelegate("image/png", imageconverter);
convert.adddelegate("image", imageconverter);
//pdf走pdf转换器
pdfmessageconverter pdfconverter = new pdfmessageconverter();
convert.adddelegate("application/pdf", pdfconverter);


adapter.setmessageconverter(convert);
container.setmessagelistener(adapter);

修改messagedelegate

public class messagedelegate {

    public void handlemessage(byte[] messagebody) {
        system.err.println("默认方法, 消息内容:" + new string(messagebody));
    }
    
    public void consumemessage(byte[] messagebody) {
        system.err.println("字节数组方法, 消息内容:" + new string(messagebody));
    }
    
    public void consumemessage(string messagebody) {
        system.err.println("字符串方法, 消息内容:" + messagebody);
    }
    
    public void method1(string messagebody) {
        system.err.println("method1 收到消息内容:" + new string(messagebody));
    }
    
    public void method2(string messagebody) {
        system.err.println("method2 收到消息内容:" + new string(messagebody));
    }
    
    //json对应map对象
    public void consumemessage(map messagebody) {
        system.err.println("map方法, 消息内容:" + messagebody);
    }
    public void consumemessage(order order) {
        system.err.println("order对象, 消息内容, id: " + order.getid() + 
                ", name: " + order.getname() + 
                ", content: "+ order.getcontent());
    }
    public void consumemessage(packaged pack) {
        system.err.println("package对象, 消息内容, id: " + pack.getid() + 
                ", name: " + pack.getname() + 
                ", content: "+ pack.getdescription());
    }
    public void consumemessage(file file) {
        system.err.println("文件对象 方法, 消息内容:" + file.getname());
    }
}

添加pdfmessageconverter

public class pdfmessageconverter implements messageconverter {

    @override
    public message tomessage(object object, messageproperties messageproperties) throws messageconversionexception {
        throw new messageconversionexception(" convert error ! ");
    }

    @override
    public object frommessage(message message) throws messageconversionexception {
        system.err.println("-----------pdf messageconverter----------");
        
        byte[] body = message.getbody();
        string filename = uuid.randomuuid().tostring();
        string path = "d:/010_test/" + filename + ".pdf";
        file f = new file(path);
        try {
            files.copy(new bytearrayinputstream(body), f.topath());
        } catch (ioexception e) {
            e.printstacktrace();
        }
        return f;
    }

}

添加imagemessageconverter

public class imagemessageconverter implements messageconverter {

    @override
    public message tomessage(object object, messageproperties messageproperties) throws messageconversionexception {
        throw new messageconversionexception(" convert error ! ");
    }

    @override
    public object frommessage(message message) throws messageconversionexception {
        system.err.println("-----------image messageconverter----------");
        
        object _extname = message.getmessageproperties().getheaders().get("extname");
        string extname = _extname == null ? "png" : _extname.tostring();
        
        byte[] body = message.getbody();
        string filename = uuid.randomuuid().tostring();
        //将接受到的图片放到该位置
        string path = "d:/010_test/" + filename + "." + extname;
        file f = new file(path);
        try {
            files.copy(new bytearrayinputstream(body), f.topath());
        } catch (ioexception e) {
            e.printstacktrace();
        }
        return f;
    }
}

定义测试方法

@test
public void testsendextconvertermessage() throws exception {
//      byte[] body = files.readallbytes(paths.get("d:/002_books", "picture.png"));
//      messageproperties messageproperties = new messageproperties();
//      messageproperties.setcontenttype("image/png");
//      messageproperties.getheaders().put("extname", "png");
//      message message = new message(body, messageproperties);
//      rabbittemplate.send("", "image_queue", message);

        byte[] body = files.readallbytes(paths.get("d:/002_books", "mysql.pdf"));
        messageproperties messageproperties = new messageproperties();
        messageproperties.setcontenttype("application/pdf");
        message message = new message(body, messageproperties);
        rabbittemplate.send("", "pdf_queue", message);
}

可以自己测试下图片和pdf的保存。

源码地址:https://gitee.com/573059382/rabbitmq-demos

文末

欢迎关注个人微信公众号:coder编程
获取最新原创技术文章和免费学习资料,更有大量精品思维导图、面试资料、pmp备考资料等你来领,方便你随时随地学习技术知识!
新建了一个qq群:315211365,欢迎大家进群交流一起学习。谢谢了!也可以介绍给身边有需要的朋友。

文章收录至
github: https://github.com/codermerlin/coder-programming
gitee: https://gitee.com/573059382/coder-programming
欢迎关注并star~
消息中间件——RabbitMQ(九)RabbitMQ整合Spring AMQP实战!(全)

参考文章:

《rabbitmq消息中间件精讲》

推荐文章:

消息中间件——rabbitmq(六)理解exchange交换机核心概念!

消息中间件——rabbitmq(七)高级特性全在这里!(上)

消息中间件——rabbitmq(八)高级特性全在这里!(下)