SpringBoot整合RabbitMQ(二):定制MessageConverter
上一篇文章里面,我们已经可以发送简单的text数据到消息队列里面了。
但是在我们平常的需求里面,发送简单的文本数据是不多见的,更多的是需要JSON类型的数据,所以简单的数据发送已经不满足我们的需求了,那让我们来尝试一下发送和接收JSON数据吧!
我们先看一下RabbitMQ默认是哪个消息转换器吧
可以在RabbitTemplate的构造器打上断点查看SpringBoot默认注入的messageConverter是什么
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitTemplate.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
从上图能看出,默认注入的是SimpleMessageConverter,可以追踪一下看看~
public RabbitTemplate(ConnectionFactory connectionFactory) {
this();
setConnectionFactory(connectionFactory);
}
public RabbitTemplate() {
initDefaultStrategies();
}
protected void initDefaultStrategies() {
setMessageConverter(new SimpleMessageConverter());
}
这样就很明了了。先来看一下MessageConverter的层级关系图:
相信看到这个的大兄弟们都知道了,类名就很明确这个类是干啥用的了。
这边我就以Json数据为例子,我们需要用Jackson2JsonMessageConverter这个消息转换器。
@Configuration
public class MyRabbitMQConf {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
已经注入了我们所希望看到的Jackson2JsonMessageConverter转换器了,那我们开始用起来吧。
@Test
public void contextLoads() {
Map<Object, Object> map = new HashMap<>();
map.put("Springboot", "Hello,baby!");
map.put("RabbitMQ", "Yes,come on!");
rabbitTemplate.convertAndSend("dai.topic", "dai.test.idea", map);
Map<Object, Object> o = (Map<Object, Object>) rabbitTemplate.receiveAndConvert("dai.topic-queue");
System.out.println(o.get("Springboot"));
System.out.println(o.get("RabbitMQ"));
}
为了丰富一下博文,就来看一下它如何转换数据吧?@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}
上面是三步,为了方便,我就放在一起了,从上往下 (读者打个断点就能清清楚楚看到了)@Override
public final Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
if (messageProperties == null) {
messageProperties = new MessageProperties();
}
Message message = createMessage(object, messageProperties);
messageProperties = message.getMessageProperties();
if (this.createMessageIds && messageProperties.getMessageId() == null) {
messageProperties.setMessageId(UUID.randomUUID().toString());
}
return message;
}
主要是在createMessage这个方法:@Override
protected Message createMessage(Object objectToConvert, MessageProperties messageProperties)
throws MessageConversionException {
byte[] bytes;
try {
String jsonString = this.jsonObjectMapper
.writeValueAsString(objectToConvert);
bytes = jsonString.getBytes(getDefaultCharset());
}
catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(getDefaultCharset());
messageProperties.setContentLength(bytes.length);
if (getClassMapper() == null) {
getJavaTypeMapper().fromJavaType(this.jsonObjectMapper.constructType(objectToConvert.getClass()),
messageProperties);
}
else {
getClassMapper().fromClass(objectToConvert.getClass(),
messageProperties);
}
return new Message(bytes, messageProperties);
}
这里先贴出全部的代码,下面慢慢消化:然后再将json字符串变成byte数组。
public static final String CONTENT_TYPE_JSON = "application/json";
然后就是设置一些属性,包括拿到原消息的JavaType
最后得到的messageProperties:
最后就返回一个Message,然后让它跑完吧(如何发送消息就不看了,技术还是渣渣,先研究一下,后续文章可能会写发送消息的这个过程)。再来看看是否发送成功了
注意红色部分,是不是在刚才我们创建message的时候就已经组装好了?已经能够正常发送消息了,那么接收消息呢?
很好,完美!!!那就顺便来看一下收到的消息是怎么转换的
@Override
public Object fromMessage(Message message)
throws MessageConversionException {
Object content = null; //要返回的内容就在这里
MessageProperties properties = message.getMessageProperties();//这个MessageProperties跟发送的那时候那个配置是一样的
if (properties != null) {
String contentType = properties.getContentType();
if (contentType != null && contentType.contains("json")) {//是json的content-type
String encoding = properties.getContentEncoding();//UTF-8
if (encoding == null) {
encoding = getDefaultCharset();
}
try {
if (getClassMapper() == null) {
JavaType targetJavaType = getJavaTypeMapper()//这里的JavaType是java.util.HashMap
.toJavaType(message.getMessageProperties());
content = convertBytesToObject(message.getBody(),//拿到结果,结果截图放在下面
encoding, targetJavaType);
}
else {
Class<?> targetClass = getClassMapper().toClass(
message.getMessageProperties());
content = convertBytesToObject(message.getBody(),
encoding, targetClass);
}
}
catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
}
else {
if (log.isWarnEnabled()) {
log.warn("Could not convert incoming message with content-type ["
+ contentType + "], 'json' keyword missing.");
}
}
}
if (content == null) {
content = message.getBody();
}
return content;
}
这就是从消息队列中获取到的消息!
然后再来简单的搞一下发送一个实体类吧。消息转换器用SerializerMessageConverter。记得实体类要实现Serializable
@Test
public void contextLoads2() {
Student stu = new Student();
stu.setName("靓仔浩");
stu.setAge(22);
rabbitTemplate.convertAndSend("dai.topic", "dai.test.idea", stu);
Student o = (Student) rabbitTemplate.receiveAndConvert("dai.topic-queue");
System.out.println(stu.toString());
}
好了,都能正常收发信息了。这个消息转换是挺简单的,看各自的需求选择需要收发的消息
推荐阅读
-
完整SpringBoot Cache整合redis缓存(二)
-
SpringBoot整合RabbitMQ之主题交换机模式
-
SpringBoot集成RabbitMq(二)
-
完整SpringBoot Cache整合redis缓存(二)
-
SpringBoot整合RabbitMQ实战教程 RabbitMQ
-
SpringBoot整合RabbitMQ(二):定制MessageConverter
-
Springboot整合RabbitMQ(六):远程过程调用(RPC)
-
SpringBoot ~ 整合AMQP(RabbitMQ)
-
SpringBoot整合rabbitMQ,spring-boot-starter-amqp 的使用
-
【RabbitMQ】基本使用:Spring AMQP配置使用及SpringBoot整合