欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Springboot整合RabbitMQ(六):远程过程调用(RPC)

程序员文章站 2022-07-12 12:55:40
...

在第二篇教程中我们介绍了如何使用工作队列(work queue)在多个工作者(woker)中间分发耗时的任务。
了解springcloud架构可以加求求:三五三六二四七二五九
可是如果我们需要将一个函数运行在远程计算机上并且等待从那儿获取结果时,该怎么办呢?这就是另外的故事了。这种模式通常被称为远程过程调用(Remote Procedure Call)或者 RPC。

这篇教程中,我们会使用 RabbitMQ 来构建一个 RPC 系统:包含一个客户端和一个 RPC 服务器。现在的情况是,我们没有一个值得被分发的足够耗时的任务,所以接下来,我们会创建一个模拟 RPC 服务来返回斐波那契数列。

客户端接口(Client interface)
为了展示 RPC 服务如何使用,我们将把配置文件的名称从 “Sender” 和 “Receiver” 更改为 “Client” 和 “Server”,当我们调用服务器时,我们将获得参数的斐波那契值。

Integer response = (Integer) template.convertSendAndReceive(exchange.getName(), "rpc", start++);
System.out.println(" [.] Got '" + response + "'");

尽管 RPC 在计算领域是一个常用模式,但它也经常被诟病。当一个问题被抛出的时候,程序员往往意识不到这到底是由本地调用还是由较慢的 RPC 调用引起的。同样的困惑还来自于系统的不可预测性和给调试工作带来的不必要的复杂性。跟软件精简不同的是,滥用 RPC 会导致不可维护的面条代码.
考虑到这一点,牢记以下建议:
确保能够明确的搞清楚哪个函数是本地调用的,哪个函数是远程调用的。给你的系统编写文档。保持各个组件间的依赖明确。处理错误案例。明了客户端改如何处理 RPC 服务器的宕机和长时间无响应情况。
当对避免使用 RPC 有疑问的时候。如果可以的话,你应该尽量使用异步管道来代替 RPC 类的阻塞。结果被异步地推送到下一个计算场景。
回调队列(Callback queue)
一般来说通过 RabbitMQ 来实现 RPC 是很容易的。一个客户端发送请求信息,服务器端将其应用到一个回复信息中。为了接收到回复信息,客户端需要在发送请求的时候同时发送一个回调队列(callback queue)的地址。

当我们使用上面的 convertSendAndReceive() 方法时,Spring AMQP 的 RabbitTemplate 为我们处理回调队列,使用 RabbitTemplate 时无需做任何其他设置。有关详细解释,请参阅请求 / 回复消息。

消息属性
AMQP 协议给消息预定义了一系列的 14 个属性。大多数属性很少会用到,除了以下几个:

deliveryMode(投递模式):将消息标记为持久的(值为 2)或暂存的(除了 2 之外的其他任何值)。
contentType(内容类型): 用来描述编码的 mime-type。例如在实际使用中常常使用 application/json 来描述 JOSN 编码类型。
replyTo(回复目标):通常用来命名回调队列。
correlationId(关联标识):用来将 RPC 的响应和请求关联起来。
关联标识(Correlation Id)
上边介绍的方法中,我们建议给每一个 RPC 请求新建一个回调队列。这不是一个高效的做法,幸好这儿有一个更好的办法 —— 我们可以为每个客户端只建立一个独立的回调队列。

这就带来一个新问题,当此队列接收到一个响应的时候它无法辨别出这个响应是属于哪个请求的。correlationId 就是为了解决这个问题而来的。我们给每个请求设置一个独一无二的值。稍后,当我们从回调队列中接收到一个消息的时候,我们就可以查看这条属性从而将响应和请求匹配起来。如果我们接手到的消息的 correlationId 是未知的,那就直接销毁掉它,因为它不属于我们的任何一条请求。

你也许会问,为什么我们接收到未知消息的时候不抛出一个错误,而是要将它忽略掉?这是为了解决服务器端有可能发生的竞争情况。尽管可能性不大,但 RPC 服务器还是有可能在已将应答发送给我们但还未将确认消息发送给请求的情况下死掉。如果这种情况发生,RPC 在重启后会重新处理请求。这就是为什么我们必须在客户端优雅的处理重复响应,同时 RPC 也需要尽可能保持幂等性。

总结

我们的 RPC 如此工作:

当客户端启动的时候,它创建一个匿名独享的回调队列。
在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 replyTo 属性,另一个是设置唯一值的 correlationId 属性。
将请求发送到一个 rpc_queue(tut.rpc) 队列中。
RPC 工作者(又名:服务器)等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 replyTo 字段指定的队列。
客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlationId 属性。如果此属性的值与请求匹配,将它返回给应用。 整合到一起,了解springcloud架构可以加求求:三五三六二四七二五九
代码整合
斐波那契数列任务:

private int fib(int i) {
    return (i == 0 || i == 1) ? i : (fib(i - 2) + fib(i - 1));
}

我们定义一个斐波那契的方法,假定只有有效的正整数输入。(不要指望它为大数据工作,这可能是最慢的递归实现)

配置类

Profile({"tut6", "rpc"})
@Configuration
public class Tut6Config {Profile("client")
    private static class ClientConfig {
        @Bean
        public DirectExchange exchange() {
            return new DirectExchange("tut.rpc");
        }

         @Bean
        public Tut6Client client() {
            return new Tut6Client();
        }
    }Profile("server")
     private static class ServerConfig {
         @Bean
        public Queue queue() {
            return new Queue("tut.rpc.requests");
        }

         @Bean
        public DirectExchange exchange() {
            return new DirectExchange("tut.rpc");
        }

         @Bean
        public Binding binding(DirectExchange exchange, Queue queue) {
            return BindingBuilder.bind(queue)
                    .to(exchange)
                    .with("rpc");
        }

         @Bean
        public Tut6Server server() {
            return new Tut6Server();
        }
    }

}

服务端

public class Tut6Server {

   @RabbitListener(queues = "tut.rpc.requests")
    public int process(int in) {
        System.out.println(" [x] Received request for " + in);
        int result = fib(in);
        System.out.println(" [.] Returned " + result);
        return result;
    }

    /**
     * 斐波那契数
     *
     * @param i
     * @return
     */
    private int fib(int i) {
        return (i == 0 || i == 1) ? i : (fib(i - 2) + fib(i - 1));
    }

}

客户端

public class Tut6Client {

   @Autowird 
    private AmqpTemplate template;

    @Autowird
    private DirectExchange exchange;

    private int start = 0;

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        System.out.println(" [x] Requesting fib(" + start + ")");
        Integer response = (Integer) template
                .convertSendAndReceive(exchange.getName(), "rpc", start++);
        System.out.println(" [.] Got '" + response + "'");
    }

}

运行
maven 编译

mvn clean package -Dmaven.test.skip=true
运行

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut6,server --tutorial.client.duration=60000
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut6,client --tutorial.client.duration=60000
输出

// Client
Ready … running for 60000ms
[x] Requesting fib(0)
[.] Got ‘0’
[x] Requesting fib(1)
[.] Got ‘1’
[x] Requesting fib(2)
[.] Got ‘1’
[x] Requesting fib(3)
[.] Got ‘2’
[x] Requesting fib(4)
[.] Got ‘3’
[x] Requesting fib(5)
[.] Got ‘5’
[x] Requesting fib(6)
[.] Got ‘8’
[x] Requesting fib(7)
[.] Got ‘13’
[x] Requesting fib(8)
[.] Got ‘21’

// Server
Ready … running for 60000ms
[x] Received request for 0
[.] Returned 0
[x] Received request for 1
[.] Returned 1
[x] Received request for 2
[.] Returned 1
[x] Received request for 3
[.] Returned 2
[x] Received request for 4
[.] Returned 3
[x] Received request for 5
[.] Returned 5
[x] Received request for 6
[.] Returned 8
[x] Received request for 7
[.] Returned 13
[x] Received request for 8
[.] Returned 21
以上的设计不是唯一可能的实现一个 RPC 服务的,但它有一些重要的优点:

如果 RPC 服务器速度太慢,则只需运行多个即可。尝试在新的控制台运行的第二个 RPCServer。
在客户端,RPC 请求只发送或接收一条消息。不需要像 queueDeclare 这样的异步调用。所以 RPC 客户端的单个请求只需要一个网络往返。
我们的代码依旧非常简单,而且没有试图去解决一些复杂(但是重要)的问题,如:

当没有服务器运行时,客户端如何作出反映。
客户端是否需要实现类似 RPC 超时的东西。
如果服务器发生故障,并且抛出异常,应该被转发到客户端吗?
在处理前,防止混入无效的信息(例如检查边界)

相关标签: spring boot b2b2c