欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ 源码阅读笔记 —— remoting(一)

程序员文章站 2022-03-23 12:55:25
...

本文的源码版本是rocketmq-4.2.0

1 模块结构

rocketmq-remoting:基于netty的底层通信实现,所有服务间的交互都基于此模块。
remoting模块主要的类结构如下:
RocketMQ 源码阅读笔记 —— remoting(一)

RemotingService 为最上层接口,定义了三个方法start、shutdown、registerRPCHook;
RemotingServer 和 RemotingClient 都继承了 RemotingService,分别对应通信的服务端和客户端;

RemotingServer:

package org.apache.rocketmq.remoting;

import io.netty.channel.Channel;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

public interface RemotingServer extends RemotingService {

    /**
     * 注册处理器,用来设置接收到消息后的处理方法。
     */
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    /**
     * 注册默认的处理器,用来设置接收到消息后的处理方法。
     */
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    int localListenPort();

    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    /**
     * 同步的双向消息处理,返回客户端一个RemotingCommand
     */
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;

    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    /**
     * 单向消息处理,一般指心跳消息或者注册消息这样的类型
     */
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;
}

RemotingClient:

package org.apache.rocketmq.remoting;

import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

public interface RemotingClient extends RemotingService {

    /**
     * 获取有效的nameserver地址
     */
    void updateNameServerAddressList(final List<String> addrs);

    List<String> getNameServerAddressList();

    /**
     * 用来向 Server 端发送请求,同步的双向的
     */
    RemotingCommand invokeSync(final String addr, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingConnectException,
        RemotingSendRequestException, RemotingTimeoutException;

    /**
     * 用来向 Server 端发送请求,异步的双向的
     */
    void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    /**
     * 单向消息处理,一般指心跳消息或者注册消息这样的类型
     */
    void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
        RemotingTimeoutException, RemotingSendRequestException;

    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    void setCallbackExecutor(final ExecutorService callbackExecutor);

    boolean isChannelWritable(final String addr);
}

NettyRemotingServer 和 NettyRemotingClient 分别实现了 RemotingServer与 RemotingClient 这两个接口,但它们有很多共有的内容,比如invokeSync、invokeOneway 等 ,所以这些共有函数被提取到 NettyRemotingAbstract 共同继承的父类中 。NettyRemotingAbstract这个抽象类包含了很多公共数据处理,也包含了很多重要的数据结构, 其它还有NettyEvent, NettyEncoder, NettyDecoder和RemotingCommand等一系列通信过程中使用到的类。首先是RocketMQ的通信协议设计和编解码。

2 协议设计和编解码

remoting模块的协议设计定义在包 org.apache.rocketmq.remoting.protocol 下
该包下的文件如下:

文件 功能
LanguageCode 枚举类型、请求方和接收方实现的语言类型编码
RemotingCommand 在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作
RemotingCommandType 远程命令类型的枚举,请求和响应
RemotingSerializable json序列化的编解码
RemotingSysResponseCode 同步响应的结果编码,结果状态编码
RocketMQSerializable
SerializeType 序列化类型的枚举,json和 RocketMQ

主要来看下RemotingCommand

2.1 RemotingCommand

在分析具体的api接口之前, 先介绍一下RocketMQ的通信协议是如何设计的。具体的通信协议格式如下:

消息长度 序列化类型&&头部长度 消息头数据 消息主体数据

1、消息长度:消息的总长度,int类型,四个字节存储;
2、序列化类型&&头部长度:int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
3、消息头数据:经过序列化后的消息头数据;
4、消息主体数据:消息主体的二进制字节数据内容,消息的编码和解码分别在RemotingCommand类的encode和decode方法中完成

RemotingCommand 类中消息头的成员变量:

    /**
     * request: 请求操作码,请求接收方根据不同的代码做不同的操作
     * response: 应答结果代码,0表示成功,非0表示各种错误代码
     */
    private int code;
    /**
     * request & response: 请求方和应答方实现语言
     */
    private LanguageCode language = LanguageCode.JAVA;
    /**
     * request & response: 请求方和应答方程序版本
     */
    private int version = 0;
    /**
     * request: 请求方在同一连接上不同的请求标识代码,多线程连接复用使用
     * response: 应答方不修改直接返回
     */
    private int opaque = requestId.getAndIncrement();
    /**
     * request & response: 通信层的标志位
     */
    private int flag = 0;
    /**
     * request: 传输自定义文本信息
     * response: 错误详细的描述信息
     */
    private String remark;
    /**
     * request: 请求自定义字段
     * response: 应答自定义字段
     */
    private HashMap<String, String> extFields;

该类中还包含了构建请求、响应远程命令;编解码;编解码消息头等方法,先主要分析一下编解码方法encode 和 decode 其他方法之后用到再做分析。

2.2 编码

首先是encode方法:

public ByteBuffer encode() {
        // 1> header length size
        /**
         * 协议组成第二部分序列化类型和头长度,占用4个字节
         */
        int length = 4;
        // 2> header data length
        /**
         * 协议组成的第三部分,首先得到消息头,再加上该消息头长度----- a
         */
        byte[] headerData = this.headerEncode();
        length += headerData.length;
        /**
         * 第四部分消息体长度
         */
        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }
        /**
         * 协议第一部分,加上记录整个消息长度的4个字节,申请缓冲区
         */
        ByteBuffer result = ByteBuffer.allocate(4 + length);

        // length
        result.putInt(length);

        // header length ------ b
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        // body data;
        if (this.body != null) {
            result.put(this.body);
        }

        /**
         * 写完后切换可读
         */
        result.flip();

        return result;
    }

a、在获取消息头时调动方法 headerEncode
b、在构成头的字段是调用方法 markProtocolType

首先是 headerEncode 方法


private byte[] headerEncode() {
        // 将消息头中自定义的字段写入 extFields ----- a.1
        this.makeCustomHeaderToNet();
        // 判断目前采用的序列化方式 主要看下json, 编码 RemotingCommand 为json格式,返回编码后的字节数组 ------ a.2
        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
            return RocketMQSerializable.rocketMQProtocolEncode(this);
        } else {
            return RemotingSerializable.encode(this);
        }
    }

该方法分为两部分,一部分 a.1 :将消息头的自定义的字段写入 extFields 的HashMap中;第二部分 a.2 :是将 RemotingCommand 进行序列化 返回字节数组。
第一部分a.1 调用方法 makeCustomHeaderToNet() :

public void makeCustomHeaderToNet() {
        if (this.customHeader != null) {
            //获取所有声明的成员变量
            Field[] fields = getClazzFields(customHeader.getClass());
            if (null == this.extFields) {
                this.extFields = new HashMap<String, String>();
            }

            for (Field field : fields) {
                //成员变量修饰符非 static
                if (!Modifier.isStatic(field.getModifiers())) {
                    // 成员变量名
                    String name = field.getName();
                    if (!name.startsWith("this")) {
                        Object value = null;
                        try {
                            field.setAccessible(true);
                            //成员变量值
                            value = field.get(this.customHeader);
                        } catch (Exception e) {
                            log.error("Failed to access field [{}]", name, e);
                        }

                        if (value != null) {
                            //写入自定义字段的 HashMap 中
                            this.extFields.put(name, value.toString());
                        }
                    }
                }
            }
        }
    }

第一部分a.2 主要看下json的,调用方法 RemotingSerializable.encode(),这是 RemotingCommand 实现 json 序列化的类:

package org.apache.rocketmq.remoting.protocol;

import com.alibaba.fastjson.JSON;
import java.nio.charset.Charset;

public abstract class RemotingSerializable {
    private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");

    /**
     * 编码对象为字节数组
     * @param obj
     * @return json 字符串的字节数组
     */
    public static byte[] encode(final Object obj) {
        final String json = toJson(obj, false);
        if (json != null) {
            return json.getBytes(CHARSET_UTF8);
        }
        return null;
    }

    /**
     * obj => json 非格式化
     * @param obj 转换前 Java 对象
     * @param prettyFormat 转换后是否格式化
     * @return 转换后字符串
     */
    public static String toJson(final Object obj, boolean prettyFormat) {
        return JSON.toJSONString(obj, prettyFormat);
    }

    /**
     * 解码字节码为java 对象
     */
    public static <T> T decode(final byte[] data, Class<T> classOfT) {
        final String json = new String(data, CHARSET_UTF8);
        return fromJson(json, classOfT);
    }

    /**
     * json字符串转为java 对象
     */
    public static <T> T fromJson(String json, Class<T> classOfT) {
        return JSON.parseObject(json, classOfT);
    }

    public byte[] encode() {
        final String json = this.toJson();
        if (json != null) {
            return json.getBytes(CHARSET_UTF8);
        }
        return null;
    }

    public String toJson() {
        return toJson(false);
    }

    public String toJson(final boolean prettyFormat) {
        return toJson(this, prettyFormat);
    }
}

关于 markProtocolType 方法:

public static byte[] markProtocolType(int source, SerializeType type) {
        //长度为4 字节数组
        byte[] result = new byte[4];

        // 序列化类型编码
        result[0] = type.getCode();
        //int 4个字节长度数字写入长度3个字节中
        result[1] = (byte) ((source >> 16) & 0xFF);
        result[2] = (byte) ((source >> 8) & 0xFF);
        result[3] = (byte) (source & 0xFF);
        return result;
    }

2.3 解码

decode方法:

public static RemotingCommand decode(final byte[] array) {
        // 字节数组转为 ByteBuffer进行解码
        ByteBuffer byteBuffer = ByteBuffer.wrap(array);
        return decode(byteBuffer);
    }

    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        //等到消息的总长度
        int length = byteBuffer.limit();
        //得到消息中int类型的长度,此处看代码是直接获取的(序列化类型&头长度)这一段,
        // 猜测是不是接收到的之后ChannelHandler直接去掉了
        int oriHeaderLen = byteBuffer.getInt();
        // 消息头长度
        int headerLength = getHeaderLength(oriHeaderLen);

        //position 到了头长度末尾,直接get头长度的字节得到消息头
        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        //调用headerDecode 解码消息头
        //调用 getProtocolType 获取协议类型
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        //出去消息头4个字节 和 消息头长 剩下的为消息体
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

    public static int getHeaderLength(int length) {
        return length & 0xFFFFFF;
    }

    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
        switch (type) {
            case JSON:
                // 调用 RemotingSerializable 中的decode 解码json 返回 RemotingCommand
                RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
                resultJson.setSerializeTypeCurrentRPC(type);
                return resultJson;
            case ROCKETMQ:
                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
                resultRMQ.setSerializeTypeCurrentRPC(type);
                return resultRMQ;
            default:
                break;
        }

        return null;
    }

    public static SerializeType getProtocolType(int source) {
        //第一个字节为协议类型 则右移24位
        return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
    }

3 NettyRemotingServer

首先是该类中的变量:

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
    // Netty引导
    private final ServerBootstrap serverBootstrap;
    // 处理网络IO
    private final EventLoopGroup eventLoopGroupSelector;
    // 接受连接请求
    private final EventLoopGroup eventLoopGroupBoss;
    // Netty Server配置
    private final NettyServerConfig nettyServerConfig;
    // 处理接受消息的线程池
    private final ExecutorService publicExecutor;
    // 监听Channel,用于监听 Client 和 Broker 心跳
    private final ChannelEventListener channelEventListener;

    private final Timer timer = new Timer("ServerHouseKeepingService", true);
    private DefaultEventExecutorGroup defaultEventExecutorGroup;

    private RPCHook rpcHook;

    private int port = 0;

    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
    private static final String TLS_HANDLER_NAME = "sslHandler";
    private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
    ....
 }

NettyRemotingServer的构造函数:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        /**
         * 调用NettyRemotingAbstract方法中的构造方法,设置单向请求数,双向请求数
         */
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;

        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        /**
         * 定义用来处理接收消息的线程池,供之后的注册处理器方法调用
         */
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        /**
         * 定义接受远程连接的线程池
         */
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });


        /**
         * 定义处理网络IO的线程池,通过配置是否采用epoll模式来定义
         */
        if (useEpoll()) {
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }

        TlsMode tlsMode = TlsSystemConfig.tlsMode;
        log.info("Server is running in TLS {} mode", tlsMode.getName());

        if (tlsMode != TlsMode.DISABLED) {
            try {
                sslContext = TlsHelper.buildSslContext(false);
                log.info("SSLContext created for server");
            } catch (CertificateException e) {
                log.error("Failed to create SSLContext for server", e);
            } catch (IOException e) {
                log.error("Failed to create SSLContext for server", e);
            }
        }
    }

然后是Server的启动函数 start :

public void start() {

        /**
         * 处理 ChannelHandler 的线程池
         */
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                    /**tcp/ip协议listen函数中的backlog参数,
                     * 服务端处理客户端连接请求是顺序处理的,
                     * 所以同一时间只能处理一个客户端连接,
                     * 多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,
                     * backlog参数指定了队列的大小
                     */
                .option(ChannelOption.SO_REUSEADDR, true)
                    /**对应于套接字选项中的SO_REUSEADDR,
                     * 这个参数表示允许重复使用本地地址和端口,
                     * 某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,
                     * 而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR
                     */
                .option(ChannelOption.SO_KEEPALIVE, false)
                    /**对应于套接字选项中的SO_KEEPALIVE,
                     * 该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,
                     * 这个选项用于可能长时间没有数据交流的
                     * 连接,当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                     */
                .childOption(ChannelOption.TCP_NODELAY, true)
                    /**对应于套接字选项中的TCP_NODELAY,
                     * 该参数的使用与Nagle算法有关,Nagle算法是将小的数据包组装为更大的帧然后进行发送,
                     * 而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,
                     * 组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,
                     * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,
                     * 于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发数据,
                     * 适用于文件传输。
                     */
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    /**
                     * 这两个参数用于操作接收缓冲区和发送缓冲区的大小,
                     * 接收缓冲区用于保存网络协议站内收到的数据,
                     * 直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
                     */
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                /**
                                 * 处理TLS握手的 ChannelHandler
                                 */
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                                /**
                                 * 接受消息处理包括编解码、活跃Channel的监测、连接处理、事件处理
                                 */
                            .addLast(defaultEventExecutorGroup,
                                new NettyEncoder(),
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                new NettyConnectManageHandler(),
                                new NettyServerHandler()
                            );
                    }
                });


        /**
         * 使用对象池,重用缓冲区
         */
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        /**
         *异步地绑定服务器,调用 sync()方法阻塞等待直到绑定完成
         */
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        /**
         * 定时线程扫描 应答表
         */
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

重点看NettyServerHandler接受事件处理实现:

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        //调用processMessageReceived来处理接受到的消息
            processMessageReceived(ctx, msg);
        }
    }

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                //处理请求命令
                case REQUEST_COMMAND:
                    // --------------a
                    processRequestCommand(ctx, cmd);
                    break;
                //处理响应命令
                case RESPONSE_COMMAND:
                    // --------------b
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
   // -----------a 调用处理请求方法
 public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        /**
         * 根据RemotingCommand的code得到 NettyRequestProcessor 和 线程池
         */
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();

        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                        if (rpcHook != null) {
                            rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        }

                        /**
                         * 调用各自 NettyRequestProcessor 的 processRequest 方法返回 response
                         */
                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                        if (rpcHook != null) {
                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        }

                        /**
                         * 若请求不是单向的
                         */
                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                /**
                                 * 返回不改变opaque的值
                                 */
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    /**
                                     * 写入Channel 并发送
                                     */
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    log.error("process request over, but response failed", e);
                                    log.error(cmd.toString());
                                    log.error(response.toString());
                                }
                            } else {

                            }
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        /**
                         * 当出现异常且请求不为单向请求
                         */
                        if (!cmd.isOnewayRPC()) {
                            /**
                             * 创建一个应答response, 写入Channel并发送
                             */
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };

            /**
             * request处理器忙则返回响应response
             */
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                /**
                 * 将上述定义的request处理提交到对应的线程池
                 */
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }

                /**
                 * 不是单向请求则返回busy response
                 */
                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            /**
             * 没有对应的 Pair 则返回 NOT_SUPPORTED 错误
             */
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

  // ------------------b 处理responseCommand
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        /**
         * 从未回应的请求列表里面获取对应的 ResponseFuture
         */
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            /**
             * 从未响应的请求列表删除
             */
            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
                /**
                 * 执行回调函数
                 */
                executeInvokeCallback(responseFuture);
            } else {
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }

    /**
     * Execute callback in callback executor. If callback executor is null, run directly in current thread
     */
    private void executeInvokeCallback(final ResponseFuture responseFuture) {
        boolean runInThisThread = false;
        ExecutorService executor = this.getCallbackExecutor();
        if (executor != null) {
            try {
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            /**
                             * 执行回调函数
                             */
                            responseFuture.executeInvokeCallback();
                        } catch (Throwable e) {
                            log.warn("execute callback in executor exception, and callback throw", e);
                        } finally {
                            responseFuture.release();
                        }
                    }
                });
            } catch (Exception e) {
                runInThisThread = true;
                log.warn("execute callback in executor exception, maybe executor busy", e);
            }
        } else {
            runInThisThread = true;
        }
        /**
         * 线程池为空则在当前线程下执行
         */
        if (runInThisThread) {
            try {
                responseFuture.executeInvokeCallback();
            } catch (Throwable e) {
                log.warn("executeInvokeCallback Exception", e);
            } finally {
                responseFuture.release();
            }
        }
    }

4 NettyRomotingClient

Client源码与Server大致相同,不同之处在于netty的引导时,只有一个eventLoopGroup,永不接受连接。

相关标签: RocketMQ