欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ指南(六) 远程过程调用

程序员文章站 2022-03-05 18:29:06
...

RabbitMQ指南(六) 远程过程调用


  远程过程调用(Remote Procedure Call,RPC),即从远程计算机上请求服务。
  利用消息队列进行远程过程调用,与普通的消息收发相比,需处理以下几个问题:
  (1)通常请求远程服务的客户端不只一个,需要每个客户端都能发送请求到服务端;
  (2)远程服务的响应消息需要发送给请求的客户端,不能发往其他客户端;
  (3)客户端接收到的响应消息,需要知道是那一次请求的响应,即响应要能和请求一一对应起来。
  解决第一个问题,只需要客户端和远程服务端事先约定RPC请求消息发送的交换机和队列,服务端监听该队列,就可接收到所有客户端的请求。通常使用直连类型交换机。
  解决第二个问题,需要客户端发送请求的同时,将应答消息应当发往的队列一同发送给服务端,服务端处理请求后,将响应消息发往指定的队列,就可保证响应消息发往了请求的客户端,而不是其他客户端。客户端指定接收响应消息的队列时,应保证该队列不会与其他客户端的队列重复。
  解决第三个问题,需要客户端将请求唯一编号,并在发送请求的同时,将该唯一请求编号发往服务端,服务端返回响应消息的同时,将唯一请求编号一同返回,客户端拿到该编号,就能将该响应与请求对应上。
  在队列的设计上,还有以下处理细节,可供参考。客户端的响应消息接收队列应为排他的,防止数据泄露,且客户端退出或异常断开后可及时释放资源;通常客户端调用远程服务包含超时时间参数,当超时仍无响应消息返回后,进入异常处理流程,若服务端由于处理速度慢,在超时时间内未返回响应消息,而在客户端进入异常处理流程后返回了响应消息,则此响应消息不会被客户端消费,若这样的消息逐渐积累增多,会占用RabbitMQ的资源,故客户端应为响应队列设置消息生存周期参数。
  以下是一个利用RabiitMQ进行远程过程调用的较为完整的例子。
  服务端:

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


public class Server {

	// RabbitMQ服务端地址、端口、用户名、密码
	private static final String ADDRESS = "127.0.0.1";	// 服务端与RabbitMQ运行在同一台服务器
	private static final int PORT = 5672;
	private static final String USERNAME = "mqtester";
	private static final String PASSWORD = "mqtester";
	// RPC请求队列
	private static final String RPC_REQ_QUEUE = "rpc_req_queue";
	
	
	public static String serviceMethod(String req) {
		String respMsg = "Response message to " + req;
		return respMsg;
	}
	
	public static void main(String[] args) throws IOException, TimeoutException {
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(ADDRESS);
		factory.setPort(PORT);
		factory.setUsername(USERNAME);
		factory.setPassword(PASSWORD);
		
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明RPC请求队列,通常是持久化的
		channel.queueDeclare(RPC_REQ_QUEUE, true, false, false, null);
		// 根据服务端的情况设置合适的值,这里设置1
		channel.basicQos(1);
		
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String req = new String(body, "UTF-8");
				System.out.println("Recived: " + req);
				// 方法调用
				String resp = serviceMethod(req);
				// 将请求的编号与响应消息一同返回
				AMQP.BasicProperties replyProps = new AMQP.BasicProperties
		                  .Builder()
		                  .correlationId(properties.getCorrelationId())
		                  .build();
				// 按照客户端指定的队列返回响应消息
				String respQueue = properties.getReplyTo();
				channel.basicPublish("", respQueue, replyProps, resp.getBytes("UTF-8"));
			}
		};
		
		channel.basicConsume(RPC_REQ_QUEUE, true, consumer);
	}
}

  客户端:

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;


public class Client {

	// RabbitMQ服务端地址、端口、用户名、密码
	private static final String ADDRESS = "10.176.65.132";
	private static final int PORT = 5672;
	private static final String USERNAME = "mqtester";
	private static final String PASSWORD = "mqtester";
	// RPC请求队列
	private static final String RPC_REQ_QUEUE = "rpc_req_queue";
	
	private final String RPC_RESP_QUEUE;
	private ConnectionFactory factory = null;
	private Connection connection = null;
	private Channel channel = null;


	// 构造方法,建立连接
	public Client() throws IOException, TimeoutException {
		factory = new ConnectionFactory();
		factory.setHost(ADDRESS);
		factory.setPort(PORT);
		factory.setUsername(USERNAME);
		factory.setPassword(PASSWORD);
		connection = factory.newConnection();
		channel = connection.createChannel();
		// 通过UUID保证本客户端的RPC响应队列唯一
		RPC_RESP_QUEUE = UUID.randomUUID().toString();
		// 每次客户端的响应队列名是通过UUID产生的,客户端断开后再次连接时会使用另一个队列,将队列设置为自动删除,这样可及时释放资源
		// 将队列设置为排他队列,保证只有当前客户端使用此队列
		// 设置消息生存周期,超过超时时间才返回的响应消息不会无限积累
		Map<String, Object> arguments = new HashMap<String, Object>();
		arguments.put("x-message-ttl", 30000);
		channel.queueDeclare(RPC_RESP_QUEUE, false, true, false, arguments);
	}


	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Client client = new Client();
		// 远程调用服务端的serviceMethod()方法
		String resp = client.serviceMethod("hello", 5000L);
		if(null == resp) {
			System.out.println("Timeout!");
		} else {
			System.out.println("Response: " + resp);
		}
		client.close();
	}


	public String serviceMethod(String req, long timeout) throws IOException, InterruptedException {
		// 该次请求的唯一编号
		String corrId = UUID.randomUUID().toString();
		// 设置请求的属性,传入本次请求编号和应答队列名称
		AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(RPC_RESP_QUEUE).build();
		// 发送请求
		channel.basicPublish("", RPC_REQ_QUEUE, props, req.getBytes("UTF-8"));
		// 异步接收应答消息
		final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
		Consumer consumer = new DefaultConsumer(channel) {

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				if (properties.getCorrelationId().equals(corrId)) {
					response.offer(new String(body, "UTF-8"));
				}
			}
		};
		String consumerTag = channel.basicConsume(RPC_RESP_QUEUE, true, consumer);
		String result = response.poll(timeout, TimeUnit.MILLISECONDS);		
		channel.basicCancel(consumerTag);
		return result;
	}


	// 关闭信道和连接
	public void close() throws IOException, TimeoutException {
		channel.close();
		connection.close();
	}
}
相关标签: RabbitMQ