欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot整合RabbitMQ(二):定制MessageConverter

程序员文章站 2022-07-12 19:42:48
...

上一篇文章里面,我们已经可以发送简单的text数据到消息队列里面了。

但是在我们平常的需求里面,发送简单的文本数据是不多见的,更多的是需要JSON类型的数据,所以简单的数据发送已经不满足我们的需求了,那让我们来尝试一下发送和接收JSON数据吧!

我们先看一下RabbitMQ默认是哪个消息转换器吧

可以在RabbitTemplate的构造器打上断点查看SpringBoot默认注入的messageConverter是什么 

@Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnMissingBean(RabbitTemplate.class)
		public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 

SpringBoot整合RabbitMQ(二):定制MessageConverter

从上图能看出,默认注入的是SimpleMessageConverter,可以追踪一下看看~

public RabbitTemplate(ConnectionFactory connectionFactory) {
		this();
		setConnectionFactory(connectionFactory);
	}

public RabbitTemplate() {
		initDefaultStrategies();
	}

protected void initDefaultStrategies() {
		setMessageConverter(new SimpleMessageConverter());
	}

这样就很明了了。先来看一下MessageConverter的层级关系图:

SpringBoot整合RabbitMQ(二):定制MessageConverter

相信看到这个的大兄弟们都知道了,类名就很明确这个类是干啥用的了。

这边我就以Json数据为例子,我们需要用Jackson2JsonMessageConverter这个消息转换器。

@Configuration
public class MyRabbitMQConf {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
SpringBoot整合RabbitMQ(二):定制MessageConverter

已经注入了我们所希望看到的Jackson2JsonMessageConverter转换器了,那我们开始用起来吧。

@Test
    public void contextLoads() {
        Map<Object, Object> map = new HashMap<>();
        map.put("Springboot", "Hello,baby!");
        map.put("RabbitMQ", "Yes,come on!");
        rabbitTemplate.convertAndSend("dai.topic", "dai.test.idea", map);
        Map<Object, Object> o = (Map<Object, Object>) rabbitTemplate.receiveAndConvert("dai.topic-queue");
        System.out.println(o.get("Springboot"));
        System.out.println(o.get("RabbitMQ"));
    }
为了丰富一下博文,就来看一下它如何转换数据吧?
@Override
	public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
		convertAndSend(exchange, routingKey, object, (CorrelationData) null);
	}

public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
		send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
	}

protected Message convertMessageIfNecessary(final Object object) {
		if (object instanceof Message) {
			return (Message) object;
		}
		return getRequiredMessageConverter().toMessage(object, new MessageProperties());
	}
上面是三步,为了方便,我就放在一起了,从上往下 (读者打个断点就能清清楚楚看到了)
@Override
	public final Message toMessage(Object object, MessageProperties messageProperties)
			throws MessageConversionException {
		if (messageProperties == null) {
			messageProperties = new MessageProperties();
		}
		Message message = createMessage(object, messageProperties);
		messageProperties = message.getMessageProperties();
		if (this.createMessageIds && messageProperties.getMessageId() == null) {
			messageProperties.setMessageId(UUID.randomUUID().toString());
		}
		return message;
	}
主要是在createMessage这个方法:
@Override
	protected Message createMessage(Object objectToConvert, MessageProperties messageProperties)
			throws MessageConversionException {
		byte[] bytes;
		try {
			String jsonString = this.jsonObjectMapper
					.writeValueAsString(objectToConvert);
			bytes = jsonString.getBytes(getDefaultCharset());
		}
		catch (IOException e) {
			throw new MessageConversionException(
					"Failed to convert Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
		messageProperties.setContentEncoding(getDefaultCharset());
		messageProperties.setContentLength(bytes.length);

		if (getClassMapper() == null) {
			getJavaTypeMapper().fromJavaType(this.jsonObjectMapper.constructType(objectToConvert.getClass()),
					messageProperties);

		}
		else {
			getClassMapper().fromClass(objectToConvert.getClass(),
					messageProperties);

		}

		return new Message(bytes, messageProperties);
	}
这里先贴出全部的代码,下面慢慢消化:
String jsonString = this.jsonObjectMapper.writeValueAsString(objectToConvert);
这里是转换成为json数据:
SpringBoot整合RabbitMQ(二):定制MessageConverter
然后再将json字符串变成byte数组。
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);

public static final String CONTENT_TYPE_JSON = "application/json";

然后就是设置一些属性,包括拿到原消息的JavaType

最后得到的messageProperties:

SpringBoot整合RabbitMQ(二):定制MessageConverter

最后就返回一个Message,然后让它跑完吧(如何发送消息就不看了,技术还是渣渣,先研究一下,后续文章可能会写发送消息的这个过程)。再来看看是否发送成功了

SpringBoot整合RabbitMQ(二):定制MessageConverter

注意红色部分,是不是在刚才我们创建message的时候就已经组装好了?已经能够正常发送消息了,那么接收消息呢?

SpringBoot整合RabbitMQ(二):定制MessageConverter


很好,完美!!!那就顺便来看一下收到的消息是怎么转换的

@Override
	public Object fromMessage(Message message)
			throws MessageConversionException {
		Object content = null;   //要返回的内容就在这里
		MessageProperties properties = message.getMessageProperties();//这个MessageProperties跟发送的那时候那个配置是一样的
		if (properties != null) {
			String contentType = properties.getContentType();
			if (contentType != null && contentType.contains("json")) {//是json的content-type
				String encoding = properties.getContentEncoding();//UTF-8
				if (encoding == null) {
					encoding = getDefaultCharset();
				}
				try {

					if (getClassMapper() == null) {
						JavaType targetJavaType = getJavaTypeMapper()//这里的JavaType是java.util.HashMap
								.toJavaType(message.getMessageProperties());
						content = convertBytesToObject(message.getBody(),//拿到结果,结果截图放在下面
								encoding, targetJavaType);
					}
					else {
						Class<?> targetClass = getClassMapper().toClass(
								message.getMessageProperties());
						content = convertBytesToObject(message.getBody(),
								encoding, targetClass);
					}
				}
				catch (IOException e) {
					throw new MessageConversionException(
							"Failed to convert Message content", e);
				}
			}
			else {
				if (log.isWarnEnabled()) {
					log.warn("Could not convert incoming message with content-type ["
							+ contentType + "], 'json' keyword missing.");
				}
			}
		}
		if (content == null) {
			content = message.getBody();
		}
		return content;
	}
SpringBoot整合RabbitMQ(二):定制MessageConverter

这就是从消息队列中获取到的消息!

然后再来简单的搞一下发送一个实体类吧。消息转换器用SerializerMessageConverter。记得实体类要实现Serializable

@Test
    public void contextLoads2() {
        Student stu = new Student();
        stu.setName("靓仔浩");
        stu.setAge(22);
        rabbitTemplate.convertAndSend("dai.topic", "dai.test.idea", stu);
        Student o = (Student) rabbitTemplate.receiveAndConvert("dai.topic-queue");
        System.out.println(stu.toString());

    }
SpringBoot整合RabbitMQ(二):定制MessageConverter


SpringBoot整合RabbitMQ(二):定制MessageConverter


好了,都能正常收发信息了。这个消息转换是挺简单的,看各自的需求选择需要收发的消息