欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rabbitmq 学习-14- 官方rabbitmq+spring进行远程接口调用

程序员文章站 2022-07-13 15:47:34
...

到http://github.com/momania/spring-rabbitmq下载其示例程序

实行远程接口调用,主要在com.rabbitmq.spring.remoting下几个类:
发布服务端(Server):RabbitInvokerServiceExporter.java
接口调用客户端(Client):RabbitInvokerProxyFactoryBean.java,RabbitInvokerClientInterceptor.java,

RabbitRpcClient.java(对RpcClient的简单封装,添加了发送消息时的选项:
mandatory--是否强制发送,immediate--是否立即发送,timeOutMs--超时时间)




发布服务端(Server)——RabbitInvokerServiceExporter.java说明:
package com.rabbitmq.spring.remoting;

import static java.lang.String.format;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.remoting.support.RemoteInvocation;
import org.springframework.remoting.support.RemoteInvocationBasedExporter;
import org.springframework.remoting.support.RemoteInvocationResult;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.RpcServer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.spring.ExchangeType;
import com.rabbitmq.spring.InvalidRoutingKeyException;
import com.rabbitmq.spring.channel.RabbitChannelFactory;

public class RabbitInvokerServiceExporter extends RemoteInvocationBasedExporter
        implements InitializingBean, DisposableBean, ShutdownListener {

    private final Log log = LogFactory
            .getLog(RabbitInvokerServiceExporter.class);

    private RabbitChannelFactory channelFactory;
    private String exchange;
    private ExchangeType exchangeType;
    private String queueName;
    private String routingKey;

    private Object proxy;
    private List<RpcServer> rpcServerPool;
    private int poolsize = 1;

    public void afterPropertiesSet() {
        // 检查exchange type类型不能为fanout
        if (exchangeType.equals(ExchangeType.FANOUT)) {
            throw new InvalidRoutingKeyException(String.format(
                    "Exchange type %s not allowed for service exporter",
                    exchangeType));
        }
        exchangeType.validateRoutingKey(routingKey);

        // 调用org.springframework.remoting.support.RemoteExporter的getProxyForService(),得到代理对象
        proxy = getProxyForService();

        // 初始化rpcServer池
        rpcServerPool = new ArrayList<RpcServer>(poolsize);

        // 初始化RpcServer,并开始接收请求
        startRpcServer();
    }

    // 初始化RpcServer,并开始接收请求
    private void startRpcServer() {
        try {
            log.info("Creating channel and rpc server");

            // 创建临时的channel,用来定义queue,exchange,并进行bind
            // 这里有两个用处:
            // 1:在服务端也定义queue,避免因为先开服务端而出现queue没被定义的错误
            // 2:这里先用一个channel定义一下qeueue,后面的for循环里面就不用每个都去定义了
            Channel tmpChannel = channelFactory.createChannel();
            tmpChannel.getConnection().addShutdownListener(this);
            tmpChannel.queueDeclare(queueName, false, false, false, true, null);
            if (exchange != null) {
                tmpChannel.exchangeDeclare(exchange, exchangeType.toString());
                tmpChannel.queueBind(queueName, exchange, routingKey);
            }

            // 创建poolsize个RpcServer,每个RpcServer使用一个单独的channel,并且分别使用单独的线程去接收请求,提升接收速度
            for (int i = 1; i <= poolsize; i++) {
                try {
                    // 每次都创建一个新的channel,因为一个channel在多个线程中使用是会有问题的(官方文档和channel的JavaDoc上是这样说的)
                    Channel channel = channelFactory.createChannel();
                    String format = "Starting rpc server %d on exchange [%s(%s)] - queue [%s] - routingKey [%s]";
                    log.info(String.format(format, i, exchange, exchangeType,
                            queueName, routingKey));

                    // 使用当前的channel创建一个RpcServer去处理请求
                    final RpcServer rpcServer = createRpcServer(channel);
                    rpcServerPool.add(rpcServer);

                    // 创建一个线程让当前的RpcServer去处理请求
                    Runnable main = new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // rpcServer开始处理请求
                                throw rpcServer.mainloop();
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    };
                    // 线程开始
                    new Thread(main).start();
                } catch (IOException e) {
                    log.warn("Unable to create rpc server", e);
                }
            }
        } catch (Exception e) {
            log.error("Unexpected error trying to start rpc servers", e);
        }
    }

    // 创建RpcServer对象
    private RpcServer createRpcServer(Channel channel) throws IOException {
        return new RpcServer(channel, queueName) {

            // 重写处理接收到的消息的方法
            public byte[] handleCall(byte[] requestBody,
                    AMQP.BasicProperties replyProperties) {
                // 因为在客户端调用方法的时候,是将客户端调用的方法的信息封装成一个RemoteInvocation对象,然后序列化成一个byte数据再使用RpcClient发送到服务端的
                // 所以在这里(服务端接收消息),将消息(requestBody)反序列化成RemoteInvocation对象
                RemoteInvocation invocation = (RemoteInvocation) SerializationUtils
                        .deserialize(requestBody);

                // 根据RemoteInvocation的信息,服务端使用代理对象执行相应的方法,并得到执行结果
                RemoteInvocationResult result = invokeAndCreateResult(
                        invocation, proxy);

                // 将执行结果序列化为byte数据,然后返回给客户端
                return SerializationUtils.serialize(result);

            }
        };
    }

    public void setChannelFactory(RabbitChannelFactory channelFactory) {
        this.channelFactory = channelFactory;
    }

    @Required
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public Object getProxy() {
        return proxy;
    }

    @Override
    public void destroy() throws Exception {
        clearRpcServers();
    }

    // 清除所有的RpcServer
    private void clearRpcServers() {
        if (log.isInfoEnabled()) {
            log.info(format("Closing %d rpc servers", rpcServerPool.size()));
        }

        for (RpcServer rpcServer : rpcServerPool) {
            try {
                // 中止处理请求
                rpcServer.terminateMainloop();
                rpcServer.close();
            } catch (Exception e) {
                log.warn("Error termination rpcserver loop", e);
            }
        }
        rpcServerPool.clear();
        if (log.isInfoEnabled()) {
            log.info("Rpc servers closed");
        }

    }

    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        if (log.isInfoEnabled()) {
            log.info(String.format("Channel connection lost for reason [%s]",
                    cause.getReason()));
            log.info(String.format("Reference [%s]", cause.getReference()));
        }

        if (cause.isInitiatedByApplication()) {
            if (log.isInfoEnabled()) {
                log.info("Sutdown initiated by application");
            }
        } else if (cause.isHardError()) {
            log
                    .error("Shutdown is a hard error, trying to restart the RPC server...");
            startRpcServer();
        }
    }

    public void setExchange(String exchange) {
        this.exchange = exchange;
    }

    @Required
    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public void setPoolsize(int poolsize) {
        this.poolsize = poolsize;
    }

    @Required
    public void setExchangeType(ExchangeType exchangeType) {
        this.exchangeType = exchangeType;
    }
}

相关标签: rabbitmq