欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ与spring结合-direct消息

程序员文章站 2022-05-17 08:48:01
...

RabbitMQ与spring结合-direct消息

https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#_introduction
spring官网

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>2.1.6.RELEASE</version>
</dependency>

项目结构

RabbitMQ与spring结合-direct消息

env.properties中存着配置

#mq config
mq.host=127.0.0.1
mq.username=root
mq.password=root123
mq.port=5672
mq.vhost=/

pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.ghgcn</groupId>
	<artifactId>spring-mq</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<version.jdk>1.8</version.jdk>
	</properties>

	<dependencies>

		<dependency>
			<groupId>com.sun</groupId>
			<artifactId>tools</artifactId>
			<version>${version.jdk}</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
		<!--日志-->
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.2.3</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.8</version>
			<scope>provided</scope>
		</dependency>
		<!--只需要集成这个-->
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>2.1.6.RELEASE</version>
		</dependency>

	</dependencies>
	<profiles>
		<profile>
			<id>dev</id>
			<properties>
				<env>dev</env>
			</properties>
			<activation>
				<activeByDefault>true</activeByDefault>
			</activation>
		</profile>
		<profile>
			<id>test</id>
			<properties>
				<env>test</env>
			</properties>
		</profile>
		<profile>
			<id>prod</id>
			<properties>
				<env>prod</env>
			</properties>
		</profile>
		<profile>
			<id>prep</id>
			<properties>
				<env>prep</env>
			</properties>
		</profile>
	</profiles>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.2</version>
				<configuration>
					<source>${version.jdk}</source>
					<target>${version.jdk}</target>
					<encoding>${project.build.sourceEncoding}</encoding>
				</configuration>
			</plugin>
		</plugins>
		<filters>
			<filter>src/env/${env}.properties</filter>
		</filters>
		<resources>
			<resource>
				<directory>src/main/resources</directory>
				<includes>
					<include>**/*</include>
				</includes>
				<filtering>true</filtering>
			</resource>
		</resources>
	</build>
</project>

配置direct.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.1.xsd">


	<!-- 工厂 -->
	<rabbit:connection-factory
		id="mqConnectionFactory" host="${mq.host}" port="${mq.port}"
		username="${mq.username}" password="${mq.password}"
		virtual-host="${mq.vhost}" publisher-confirms="true"
		channel-cache-size="50" />


	<!--管理 -->
	<rabbit:admin id="mqadmin"
		connection-factory="mqConnectionFactory" />
	<!-- mq模版 -->
	<rabbit:template id="amqpTemplate"
		connection-factory="mqConnectionFactory" />

	<!--声明交换器 -->

	<rabbit:direct-exchange
		name="direct_spring_exchange" durable="true">
		<rabbit:bindings>
			<!-- 绑定队列 与routerkey direct类型rounterkey必须完全一样 -->
			<rabbit:binding queue="direct_spring_queue"
				key="direct.rount.key"></rabbit:binding>
		</rabbit:bindings>
	</rabbit:direct-exchange>

	<!--声明队列 -->
	<rabbit:queue name="direct_spring_queue" durable="true"
		exclusive="false" auto-delete="false" declared-by="mqadmin">

	</rabbit:queue>

	<bean id="directListener"
		class="com.ghgcn.springmq.direct.listener.DirectListener" />
	<!-- 监听器 -->
	<rabbit:listener-container
		connection-factory="mqConnectionFactory" acknowledge="manual">
		<rabbit:listener queues="direct_spring_queue"
			ref="directListener" />
	</rabbit:listener-container>
</beans>

消息监听

package com.ghgcn.springmq.direct.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

/**
 * Direct消息监听器
 * @author Administrator
 *
 */
@Slf4j
public class DirectListener implements ChannelAwareMessageListener {

	@Override
	public void onMessage(Message message, Channel channel) throws Exception {

		log.info("DirectListener 接收 -message  {} ",message.getMessageProperties().getMessageId());
		log.info("DirectListener 接收 -channel  {} ",channel);
		//消息确认
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
	}

}

发送消息

package com.ghgcn.springmq.direct;

import java.util.Date;
import java.util.UUID;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.rabbitmq.client.MessageProperties;

public class DirectProducer {

	
	private static String exchange="direct_spring_exchange";
	private static String rountKey="direct.rount.key";
	@SuppressWarnings("resource")
	public static void main(String[] args) {

		//amqpTemplate
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("direct.xml");
		
		
		AmqpTemplate amqpTemplate = applicationContext.getBean("amqpTemplate", AmqpTemplate.class);
		
		System.out.println("============direct message==============");
		//构建消息
		byte [] msgBytes = ("direct message test "+ new Date()).getBytes();
		Message messager = MessageBuilder.withBody(msgBytes)
        .setContentType(MessageProperties.TEXT_PLAIN.getContentType())
        .setMessageId(UUID.randomUUID().toString())
        .build();
		//发送消息
		amqpTemplate.convertAndSend(exchange, rountKey, messager);
		
		System.out.println("===发送结束===");
		
		
	}

}

结果

============direct message==============
11:13:55.033 [main] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2)
11:13:55.034 [main] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - Added listener aaa@qq.com
11:13:55.034 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 52838] to map, size now 1
11:13:55.034 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$98/1793799654 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 52838]
11:13:55.034 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'direct message test Fri May 31 11:13:55 CST 2019' MessageProperties [headers={}, messageId=0d425fbe-0726-4787-ad59-2ab59e260a88, contentType=text/plain, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [direct_spring_exchange], routingKey = [direct.rount.key]
===发送结束===
11:13:55.039 [pool-1-thread-4] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-JrSWWfVMe1lLqh8fHZkLPA' with deliveryTag: '1' in aaa@qq.com: tags=[[amq.ctag-JrSWWfVMe1lLqh8fHZkLPA]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 52838], acknowledgeMode=MANUAL local queue size=0
11:13:55.039 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'direct message test Fri May 31 11:13:55 CST 2019' MessageProperties [headers={}, messageId=0d425fbe-0726-4787-ad59-2ab59e260a88, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct_spring_exchange, receivedRoutingKey=direct.rount.key, deliveryTag=1, consumerTag=amq.ctag-JrSWWfVMe1lLqh8fHZkLPA, consumerQueue=direct_spring_queue])
11:13:55.039 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.direct.listener.DirectListener - DirectListener 接收 -message  0d425fbe-0726-4787-ad59-2ab59e260a88 
11:13:55.040 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.direct.listener.DirectListener - DirectListener 接收 -channel  Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 52838] 
11:13:55.068 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2) PC:Ack:1:false

github:https://github.com/ln0491/rabbitmq-spring-demo

相关标签: RabbitMQ