RocketMQ 源码阅读笔记 —— remoting(一)
本文的源码版本是rocketmq-4.2.0
1 模块结构
rocketmq-remoting:基于netty的底层通信实现,所有服务间的交互都基于此模块。
remoting模块主要的类结构如下:
RemotingService 为最上层接口,定义了三个方法start、shutdown、registerRPCHook;
RemotingServer 和 RemotingClient 都继承了 RemotingService,分别对应通信的服务端和客户端;
RemotingServer:
package org.apache.rocketmq.remoting;
import io.netty.channel.Channel;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingServer extends RemotingService {
/**
* 注册处理器,用来设置接收到消息后的处理方法。
*/
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
/**
* 注册默认的处理器,用来设置接收到消息后的处理方法。
*/
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
int localListenPort();
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
/**
* 同步的双向消息处理,返回客户端一个RemotingCommand
*/
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
/**
* 单向消息处理,一般指心跳消息或者注册消息这样的类型
*/
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
RemotingClient:
package org.apache.rocketmq.remoting;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingClient extends RemotingService {
/**
* 获取有效的nameserver地址
*/
void updateNameServerAddressList(final List<String> addrs);
List<String> getNameServerAddressList();
/**
* 用来向 Server 端发送请求,同步的双向的
*/
RemotingCommand invokeSync(final String addr, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException;
/**
* 用来向 Server 端发送请求,异步的双向的
*/
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
/**
* 单向消息处理,一般指心跳消息或者注册消息这样的类型
*/
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
void setCallbackExecutor(final ExecutorService callbackExecutor);
boolean isChannelWritable(final String addr);
}
NettyRemotingServer 和 NettyRemotingClient 分别实现了 RemotingServer与 RemotingClient 这两个接口,但它们有很多共有的内容,比如invokeSync、invokeOneway 等 ,所以这些共有函数被提取到 NettyRemotingAbstract 共同继承的父类中 。NettyRemotingAbstract这个抽象类包含了很多公共数据处理,也包含了很多重要的数据结构, 其它还有NettyEvent, NettyEncoder, NettyDecoder和RemotingCommand等一系列通信过程中使用到的类。首先是RocketMQ的通信协议设计和编解码。
2 协议设计和编解码
remoting模块的协议设计定义在包 org.apache.rocketmq.remoting.protocol 下
该包下的文件如下:
文件 | 功能 |
---|---|
LanguageCode | 枚举类型、请求方和接收方实现的语言类型编码 |
RemotingCommand | 在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作 |
RemotingCommandType | 远程命令类型的枚举,请求和响应 |
RemotingSerializable | json序列化的编解码 |
RemotingSysResponseCode | 同步响应的结果编码,结果状态编码 |
RocketMQSerializable | |
SerializeType | 序列化类型的枚举,json和 RocketMQ |
主要来看下RemotingCommand
2.1 RemotingCommand
在分析具体的api接口之前, 先介绍一下RocketMQ的通信协议是如何设计的。具体的通信协议格式如下:
消息长度 | 序列化类型&&头部长度 | 消息头数据 | 消息主体数据 |
---|
1、消息长度:消息的总长度,int类型,四个字节存储;
2、序列化类型&&头部长度:int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
3、消息头数据:经过序列化后的消息头数据;
4、消息主体数据:消息主体的二进制字节数据内容,消息的编码和解码分别在RemotingCommand类的encode和decode方法中完成
RemotingCommand 类中消息头的成员变量:
/**
* request: 请求操作码,请求接收方根据不同的代码做不同的操作
* response: 应答结果代码,0表示成功,非0表示各种错误代码
*/
private int code;
/**
* request & response: 请求方和应答方实现语言
*/
private LanguageCode language = LanguageCode.JAVA;
/**
* request & response: 请求方和应答方程序版本
*/
private int version = 0;
/**
* request: 请求方在同一连接上不同的请求标识代码,多线程连接复用使用
* response: 应答方不修改直接返回
*/
private int opaque = requestId.getAndIncrement();
/**
* request & response: 通信层的标志位
*/
private int flag = 0;
/**
* request: 传输自定义文本信息
* response: 错误详细的描述信息
*/
private String remark;
/**
* request: 请求自定义字段
* response: 应答自定义字段
*/
private HashMap<String, String> extFields;
该类中还包含了构建请求、响应远程命令;编解码;编解码消息头等方法,先主要分析一下编解码方法encode 和 decode 其他方法之后用到再做分析。
2.2 编码
首先是encode方法:
public ByteBuffer encode() {
// 1> header length size
/**
* 协议组成第二部分序列化类型和头长度,占用4个字节
*/
int length = 4;
// 2> header data length
/**
* 协议组成的第三部分,首先得到消息头,再加上该消息头长度----- a
*/
byte[] headerData = this.headerEncode();
length += headerData.length;
/**
* 第四部分消息体长度
*/
// 3> body data length
if (this.body != null) {
length += body.length;
}
/**
* 协议第一部分,加上记录整个消息长度的4个字节,申请缓冲区
*/
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length ------ b
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
/**
* 写完后切换可读
*/
result.flip();
return result;
}
a、在获取消息头时调动方法 headerEncode
b、在构成头的字段是调用方法 markProtocolType
首先是 headerEncode 方法
private byte[] headerEncode() {
// 将消息头中自定义的字段写入 extFields ----- a.1
this.makeCustomHeaderToNet();
// 判断目前采用的序列化方式 主要看下json, 编码 RemotingCommand 为json格式,返回编码后的字节数组 ------ a.2
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
该方法分为两部分,一部分 a.1 :将消息头的自定义的字段写入 extFields 的HashMap中;第二部分 a.2 :是将 RemotingCommand 进行序列化 返回字节数组。
第一部分a.1 调用方法 makeCustomHeaderToNet() :
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
//获取所有声明的成员变量
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
//成员变量修饰符非 static
if (!Modifier.isStatic(field.getModifiers())) {
// 成员变量名
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
//成员变量值
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
//写入自定义字段的 HashMap 中
this.extFields.put(name, value.toString());
}
}
}
}
}
}
第一部分a.2 主要看下json的,调用方法 RemotingSerializable.encode(),这是 RemotingCommand 实现 json 序列化的类:
package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.JSON;
import java.nio.charset.Charset;
public abstract class RemotingSerializable {
private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
/**
* 编码对象为字节数组
* @param obj
* @return json 字符串的字节数组
*/
public static byte[] encode(final Object obj) {
final String json = toJson(obj, false);
if (json != null) {
return json.getBytes(CHARSET_UTF8);
}
return null;
}
/**
* obj => json 非格式化
* @param obj 转换前 Java 对象
* @param prettyFormat 转换后是否格式化
* @return 转换后字符串
*/
public static String toJson(final Object obj, boolean prettyFormat) {
return JSON.toJSONString(obj, prettyFormat);
}
/**
* 解码字节码为java 对象
*/
public static <T> T decode(final byte[] data, Class<T> classOfT) {
final String json = new String(data, CHARSET_UTF8);
return fromJson(json, classOfT);
}
/**
* json字符串转为java 对象
*/
public static <T> T fromJson(String json, Class<T> classOfT) {
return JSON.parseObject(json, classOfT);
}
public byte[] encode() {
final String json = this.toJson();
if (json != null) {
return json.getBytes(CHARSET_UTF8);
}
return null;
}
public String toJson() {
return toJson(false);
}
public String toJson(final boolean prettyFormat) {
return toJson(this, prettyFormat);
}
}
关于 markProtocolType 方法:
public static byte[] markProtocolType(int source, SerializeType type) {
//长度为4 字节数组
byte[] result = new byte[4];
// 序列化类型编码
result[0] = type.getCode();
//int 4个字节长度数字写入长度3个字节中
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
2.3 解码
decode方法:
public static RemotingCommand decode(final byte[] array) {
// 字节数组转为 ByteBuffer进行解码
ByteBuffer byteBuffer = ByteBuffer.wrap(array);
return decode(byteBuffer);
}
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
//等到消息的总长度
int length = byteBuffer.limit();
//得到消息中int类型的长度,此处看代码是直接获取的(序列化类型&头长度)这一段,
// 猜测是不是接收到的之后ChannelHandler直接去掉了
int oriHeaderLen = byteBuffer.getInt();
// 消息头长度
int headerLength = getHeaderLength(oriHeaderLen);
//position 到了头长度末尾,直接get头长度的字节得到消息头
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
//调用headerDecode 解码消息头
//调用 getProtocolType 获取协议类型
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
//出去消息头4个字节 和 消息头长 剩下的为消息体
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
// 调用 RemotingSerializable 中的decode 解码json 返回 RemotingCommand
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
public static SerializeType getProtocolType(int source) {
//第一个字节为协议类型 则右移24位
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
3 NettyRemotingServer
首先是该类中的变量:
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
// Netty引导
private final ServerBootstrap serverBootstrap;
// 处理网络IO
private final EventLoopGroup eventLoopGroupSelector;
// 接受连接请求
private final EventLoopGroup eventLoopGroupBoss;
// Netty Server配置
private final NettyServerConfig nettyServerConfig;
// 处理接受消息的线程池
private final ExecutorService publicExecutor;
// 监听Channel,用于监听 Client 和 Broker 心跳
private final ChannelEventListener channelEventListener;
private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook;
private int port = 0;
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
....
}
NettyRemotingServer的构造函数:
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
/**
* 调用NettyRemotingAbstract方法中的构造方法,设置单向请求数,双向请求数
*/
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
/**
* 定义用来处理接收消息的线程池,供之后的注册处理器方法调用
*/
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
/**
* 定义接受远程连接的线程池
*/
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
}
});
/**
* 定义处理网络IO的线程池,通过配置是否采用epoll模式来定义
*/
if (useEpoll()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
TlsMode tlsMode = TlsSystemConfig.tlsMode;
log.info("Server is running in TLS {} mode", tlsMode.getName());
if (tlsMode != TlsMode.DISABLED) {
try {
sslContext = TlsHelper.buildSslContext(false);
log.info("SSLContext created for server");
} catch (CertificateException e) {
log.error("Failed to create SSLContext for server", e);
} catch (IOException e) {
log.error("Failed to create SSLContext for server", e);
}
}
}
然后是Server的启动函数 start :
public void start() {
/**
* 处理 ChannelHandler 的线程池
*/
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
/**tcp/ip协议listen函数中的backlog参数,
* 服务端处理客户端连接请求是顺序处理的,
* 所以同一时间只能处理一个客户端连接,
* 多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,
* backlog参数指定了队列的大小
*/
.option(ChannelOption.SO_REUSEADDR, true)
/**对应于套接字选项中的SO_REUSEADDR,
* 这个参数表示允许重复使用本地地址和端口,
* 某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,
* 而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR
*/
.option(ChannelOption.SO_KEEPALIVE, false)
/**对应于套接字选项中的SO_KEEPALIVE,
* 该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,
* 这个选项用于可能长时间没有数据交流的
* 连接,当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
*/
.childOption(ChannelOption.TCP_NODELAY, true)
/**对应于套接字选项中的TCP_NODELAY,
* 该参数的使用与Nagle算法有关,Nagle算法是将小的数据包组装为更大的帧然后进行发送,
* 而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,
* 组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,
* 于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发数据,
* 适用于文件传输。
*/
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
/**
* 这两个参数用于操作接收缓冲区和发送缓冲区的大小,
* 接收缓冲区用于保存网络协议站内收到的数据,
* 直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
*/
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
/**
* 处理TLS握手的 ChannelHandler
*/
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
/**
* 接受消息处理包括编解码、活跃Channel的监测、连接处理、事件处理
*/
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
/**
* 使用对象池,重用缓冲区
*/
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
/**
*异步地绑定服务器,调用 sync()方法阻塞等待直到绑定完成
*/
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
/**
* 定时线程扫描 应答表
*/
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
重点看NettyServerHandler接受事件处理实现:
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
//调用processMessageReceived来处理接受到的消息
processMessageReceived(ctx, msg);
}
}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
//处理请求命令
case REQUEST_COMMAND:
// --------------a
processRequestCommand(ctx, cmd);
break;
//处理响应命令
case RESPONSE_COMMAND:
// --------------b
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
// -----------a 调用处理请求方法
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
/**
* 根据RemotingCommand的code得到 NettyRequestProcessor 和 线程池
*/
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
/**
* 调用各自 NettyRequestProcessor 的 processRequest 方法返回 response
*/
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
if (rpcHook != null) {
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
/**
* 若请求不是单向的
*/
if (!cmd.isOnewayRPC()) {
if (response != null) {
/**
* 返回不改变opaque的值
*/
response.setOpaque(opaque);
response.markResponseType();
try {
/**
* 写入Channel 并发送
*/
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
/**
* 当出现异常且请求不为单向请求
*/
if (!cmd.isOnewayRPC()) {
/**
* 创建一个应答response, 写入Channel并发送
*/
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
/**
* request处理器忙则返回响应response
*/
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
/**
* 将上述定义的request处理提交到对应的线程池
*/
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
/**
* 不是单向请求则返回busy response
*/
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
/**
* 没有对应的 Pair 则返回 NOT_SUPPORTED 错误
*/
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
// ------------------b 处理responseCommand
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
/**
* 从未回应的请求列表里面获取对应的 ResponseFuture
*/
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
/**
* 从未响应的请求列表删除
*/
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
/**
* 执行回调函数
*/
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
/**
* Execute callback in callback executor. If callback executor is null, run directly in current thread
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
/**
* 执行回调函数
*/
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
/**
* 线程池为空则在当前线程下执行
*/
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
4 NettyRomotingClient
Client源码与Server大致相同,不同之处在于netty的引导时,只有一个eventLoopGroup,永不接受连接。