欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ-RPC案例

程序员文章站 2022-07-10 21:08:19
RPClientpackage sc.app.stc.rmq.rabbitmq;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbit...

RpcClient


package sc.app.stc.rmq.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RiskRpcClient {

	public static void main(String[] argv) throws Exception {
		RiskRpcClient.client(null);
	}

	/**
	 * rpc客户端
	 *
	 * @Description: client
	 * @author lq.
	 * @date 2020年1月3日 下午3:40:04
	 * @version V1.0
	 */
	public static Channel client(Channel channel) {
		try {
			// 此方法封装了如何连接RabbitMQ和创建connection,channel.源码见附录
			if (channel == null) {
				channel = RiskRpcServer.getChannelInstance("riskClient");
			}
			if (channel == null) {
				System.out.println(" channel is null..");
				return null;
			}
			/**
			 * 创建Exchange Exchange类型 direct , fanout, topic, topic
			 */
			channel.exchangeDeclare(RiskRpcServer.queryRiskExchange, BuiltinExchangeType.DIRECT, RiskRpcServer.exchangeDurable, RiskRpcServer.queueAutoDelete,
					null);
			// 此处注意:声明了要回复的队列。队列名称由RabbitMQ自动创建。
			// 这样做的好处是:每个客户端有属于自己的唯一回复队列,生命周期同客户端
			String replyQueue = channel.queueDeclare().getQueue();
			System.out.println(String.format("replyQueue:%s", replyQueue));
			// 绑定回复队列
			channel.queueBind(replyQueue, RiskRpcServer.queryRiskExchange, replyQueue);

			AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
			// 指定回复队列和回复correlateId
			builder.replyTo(replyQueue);
			AMQP.BasicProperties properties = builder.build();
			channel.basicPublish(RiskRpcServer.queryRiskExchange, RiskRpcServer.queryRiskRoutingKey, properties, ("{}").getBytes());
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				// 这是一个回调函数,客户端获取消息,就调用此方法,处理消息
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
						byte[] body) throws IOException {
					String json = new String(body, "utf-8");
					System.out.println(json);
				}
			};
			channel.basicConsume(replyQueue, true, consumer);
		} catch (Exception e) {
			// TODO: handle exception
		}
		return channel;
	}

}


RpcServer

package sc.app.stc.rmq.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

public class RiskRpcServer {
	public static String queryRiskExchange = "indicators.direct";
	public static String queryRiskFanoutExchange = "indicator.fanout";
	public static String queryRiskRoutingKey = "indicator.info";
	// exchange/queue的属性
	public static boolean exchangeDurable = false;// 持久化
	// 当所有绑定队列都不在使用时,是否自动删除交换器 true:删除false:不删除
	public static boolean exchangeAutoDelete = false;
	public static boolean queueDurable = true;// 持久化
	// 当所有消费客户端连接断开后,是否自动删除队列 true:删除false:不删除
	public static boolean queueAutoDelete = false;

	static String json = "";

	public static void main(String[] argv) throws Exception {
		RiskRpcServer.server();
	}

	/**
	 * rpc服务端
	 *
	 * @Description: server
	 * @throws IOException
	 * @author lq.
	 * @date 2020年1月3日 下午3:40:18
	 * @version V1.0
	 */
	public static void server() {
		try {

			final Channel channel = RiskRpcServer.getChannelInstance("riskServer");
			if (channel == null) {
				System.out.println(" channel is null..");
				return;
			}
			/**
			 * 创建Exchange Exchange类型 direct , fanout, topic, topic
			 */
			channel.exchangeDeclare(queryRiskExchange, BuiltinExchangeType.DIRECT, exchangeDurable, queueAutoDelete,
					null);
			// 获取一个临时队列
			String queueName = channel.queueDeclare().getQueue();
			// 队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
			channel.queueBind(queueName, queryRiskExchange, queryRiskRoutingKey);

			DefaultConsumer consumer = new DefaultConsumer(channel) {

				// 这是一个回到函数,服务器端获取到消息,就会调用此方法处理消息
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
						byte[] body) throws IOException {
					AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
					// 我们在将要回复的消息属性中,放入从客户端传递过来的correlateId
					builder.correlationId(properties.getCorrelationId());
					AMQP.BasicProperties prop = builder.build();
					// 发送给回复队列的消息,exchange="",routingKey=回复队列名称
					// 因为RabbitMQ对于队列,始终存在一个默认exchange="",routingKey=队列名称的绑定关系
					channel.basicPublish(queryRiskExchange, properties.getReplyTo(), prop,
							(new String(json)).getBytes());
				}
			};
			// 回调
			channel.basicConsume(queueName, true, consumer);
		} catch (IOException e) {
			System.out.println("RPCServer error");
		}
	}

	/**
	 * AMQP的连接其实是对Socket做的封装, 注意以下AMQP协议的版本号,不同版本的协议用法可能不同。
	 *
	 * @param ConnectionDescription
	 * @return
	 * @Description: getChannelInstance
	 * @author lq.
	 * @date 2020年1月2日 下午4:27:15
	 * @version V1.0
	 */
	public static Channel getChannelInstance(String ConnectionDescription) {
		Channel channel = null;
		try {
			System.out.println(String.format("create Channel :%s", ConnectionDescription));
			ConnectionFactory connectionFactory = getConnectionFactory();
			Connection connection = connectionFactory.newConnection(ConnectionDescription);

			channel = connection.createChannel();

			((Recoverable) connection).addRecoveryListener(new RecoveryListener() {
				@Override
				public void handleRecovery(Recoverable recoverable) {// 重连成功后执行
				}

				@Override
				public void handleRecoveryStarted(Recoverable recoverable) {// 重连
				}
			});
			connection.addShutdownListener(new ShutdownListener() {// 断开连接
				@Override
				public void shutdownCompleted(ShutdownSignalException cause) {
				}

			});
		} catch (Exception e) {
			// 连接失败
			System.out.println("获取Channel连接失败");
		}
		return channel;
	}

	public static ConnectionFactory getConnectionFactory() {
		ConnectionFactory connectionFactory = new ConnectionFactory();

		connectionFactory.setVirtualHost("vhost");
		connectionFactory.setHost("host");
		connectionFactory.setPort("port");
		connectionFactory.setUsername("username");
		connectionFactory.setPassword("passwprd");
		connectionFactory.setConnectionTimeout(5000);
		// 错误恢复机制
		connectionFactory.setAutomaticRecoveryEnabled(true);
		// 建议5-20,客户端值应小于服务端,单位 秒 默认60
		// connectionFactory.setRequestedHeartbeat(10);
		connectionFactory.setTopologyRecoveryEnabled(true);
		return connectionFactory;
	}
}

本文地址:https://blog.csdn.net/java_lqjsw/article/details/107366647