RabbitMQ指南(六) 远程过程调用
程序员文章站
2022-03-05 18:29:06
...
RabbitMQ指南(六) 远程过程调用
远程过程调用(Remote Procedure Call,RPC),即从远程计算机上请求服务。
利用消息队列进行远程过程调用,与普通的消息收发相比,需处理以下几个问题:
(1)通常请求远程服务的客户端不只一个,需要每个客户端都能发送请求到服务端;
(2)远程服务的响应消息需要发送给请求的客户端,不能发往其他客户端;
(3)客户端接收到的响应消息,需要知道是那一次请求的响应,即响应要能和请求一一对应起来。
解决第一个问题,只需要客户端和远程服务端事先约定RPC请求消息发送的交换机和队列,服务端监听该队列,就可接收到所有客户端的请求。通常使用直连类型交换机。
解决第二个问题,需要客户端发送请求的同时,将应答消息应当发往的队列一同发送给服务端,服务端处理请求后,将响应消息发往指定的队列,就可保证响应消息发往了请求的客户端,而不是其他客户端。客户端指定接收响应消息的队列时,应保证该队列不会与其他客户端的队列重复。
解决第三个问题,需要客户端将请求唯一编号,并在发送请求的同时,将该唯一请求编号发往服务端,服务端返回响应消息的同时,将唯一请求编号一同返回,客户端拿到该编号,就能将该响应与请求对应上。
在队列的设计上,还有以下处理细节,可供参考。客户端的响应消息接收队列应为排他的,防止数据泄露,且客户端退出或异常断开后可及时释放资源;通常客户端调用远程服务包含超时时间参数,当超时仍无响应消息返回后,进入异常处理流程,若服务端由于处理速度慢,在超时时间内未返回响应消息,而在客户端进入异常处理流程后返回了响应消息,则此响应消息不会被客户端消费,若这样的消息逐渐积累增多,会占用RabbitMQ的资源,故客户端应为响应队列设置消息生存周期参数。
以下是一个利用RabiitMQ进行远程过程调用的较为完整的例子。
服务端:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Server {
// RabbitMQ服务端地址、端口、用户名、密码
private static final String ADDRESS = "127.0.0.1"; // 服务端与RabbitMQ运行在同一台服务器
private static final int PORT = 5672;
private static final String USERNAME = "mqtester";
private static final String PASSWORD = "mqtester";
// RPC请求队列
private static final String RPC_REQ_QUEUE = "rpc_req_queue";
public static String serviceMethod(String req) {
String respMsg = "Response message to " + req;
return respMsg;
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明RPC请求队列,通常是持久化的
channel.queueDeclare(RPC_REQ_QUEUE, true, false, false, null);
// 根据服务端的情况设置合适的值,这里设置1
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String req = new String(body, "UTF-8");
System.out.println("Recived: " + req);
// 方法调用
String resp = serviceMethod(req);
// 将请求的编号与响应消息一同返回
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
// 按照客户端指定的队列返回响应消息
String respQueue = properties.getReplyTo();
channel.basicPublish("", respQueue, replyProps, resp.getBytes("UTF-8"));
}
};
channel.basicConsume(RPC_REQ_QUEUE, true, consumer);
}
}
客户端:
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Client {
// RabbitMQ服务端地址、端口、用户名、密码
private static final String ADDRESS = "10.176.65.132";
private static final int PORT = 5672;
private static final String USERNAME = "mqtester";
private static final String PASSWORD = "mqtester";
// RPC请求队列
private static final String RPC_REQ_QUEUE = "rpc_req_queue";
private final String RPC_RESP_QUEUE;
private ConnectionFactory factory = null;
private Connection connection = null;
private Channel channel = null;
// 构造方法,建立连接
public Client() throws IOException, TimeoutException {
factory = new ConnectionFactory();
factory.setHost(ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
connection = factory.newConnection();
channel = connection.createChannel();
// 通过UUID保证本客户端的RPC响应队列唯一
RPC_RESP_QUEUE = UUID.randomUUID().toString();
// 每次客户端的响应队列名是通过UUID产生的,客户端断开后再次连接时会使用另一个队列,将队列设置为自动删除,这样可及时释放资源
// 将队列设置为排他队列,保证只有当前客户端使用此队列
// 设置消息生存周期,超过超时时间才返回的响应消息不会无限积累
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 30000);
channel.queueDeclare(RPC_RESP_QUEUE, false, true, false, arguments);
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Client client = new Client();
// 远程调用服务端的serviceMethod()方法
String resp = client.serviceMethod("hello", 5000L);
if(null == resp) {
System.out.println("Timeout!");
} else {
System.out.println("Response: " + resp);
}
client.close();
}
public String serviceMethod(String req, long timeout) throws IOException, InterruptedException {
// 该次请求的唯一编号
String corrId = UUID.randomUUID().toString();
// 设置请求的属性,传入本次请求编号和应答队列名称
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(RPC_RESP_QUEUE).build();
// 发送请求
channel.basicPublish("", RPC_REQ_QUEUE, props, req.getBytes("UTF-8"));
// 异步接收应答消息
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
};
String consumerTag = channel.basicConsume(RPC_RESP_QUEUE, true, consumer);
String result = response.poll(timeout, TimeUnit.MILLISECONDS);
channel.basicCancel(consumerTag);
return result;
}
// 关闭信道和连接
public void close() throws IOException, TimeoutException {
channel.close();
connection.close();
}
}
上一篇: ipv6 环境下搭建rabbitmq集群
下一篇: 静态网页文件的扩展名是什么
推荐阅读
-
RabbitMQ入门:远程过程调用(RPC)
-
RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)
-
RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)
-
RabbitMQ指南之五:主题交换器(Topic Exchange)
-
RabbitMQ六种队列模式-路由模式
-
【自考】数据结构第六章查找,期末不挂科指南,第10篇
-
WP7有约(六):AppBarUtils使用指南
-
中小型研发团队架构实践六:如何用好消息队列RabbitMQ?
-
Java RabbitMQ:(六)RabbitMq 账户管理
-
RabbitMq 远程过程调用RPC(七)