MetaQ技术内幕——源码分析(六)
前面介绍过MetaQ使用gecko框架作为网络传输框架,Gecko采用请求/响应的方式组织传输。MetaQ依据定义了请求和响应的命令,由于命令Client和Broker均需要使用,所以放在了common工程的类MetaEncodeCommand中:
- public String GET_CMD = "get"; //请求数据请求
- public String RESULT_CMD = "result"; //结果响应(不包括消息)
- public String OFFSET_CMD = "offset"; //查询最近有效的offset请求
- public String PUT_CMD = "put"; //发送消息命令请求
- public String SYNC_CMD = "sync"; //同步数据请求
- public String QUIT_CMD = "quit"; //退出请求,客户端发送此命令后,服务器将主动关闭连接
- public String VERSION_CMD = "version"; //查询服务器版本请求,也用于心跳检测
- public String STATS_CMD = "stats"; //查询统计信息请求
- public String TRANS_CMD = "transaction"; //事务请求
- //还有一个响应,这里没有响应头的定义,就是返回的消息集
public String GET_CMD = "get"; //请求数据请求 public String RESULT_CMD = "result"; //结果响应(不包括消息) public String OFFSET_CMD = "offset"; //查询最近有效的offset请求 public String PUT_CMD = "put"; //发送消息命令请求 public String SYNC_CMD = "sync"; //同步数据请求 public String QUIT_CMD = "quit"; //退出请求,客户端发送此命令后,服务器将主动关闭连接 public String VERSION_CMD = "version"; //查询服务器版本请求,也用于心跳检测 public String STATS_CMD = "stats"; //查询统计信息请求 public String TRANS_CMD = "transaction"; //事务请求 //还有一个响应,这里没有响应头的定义,就是返回的消息集
在分析Broker启动类MetaMorphosisBroker时,分析过registerProcessors()方法,针对于不同的请求,Broker注册了不同的处理器,详情见registerProcessors()方法。MetaQ传输采用文本协议设计,非常透明,MetaEncodeCommand定义是请求类型。
Broker分为请求和响应的命令,先看看请求的类图:
注:由于类图工具出现问题,AbstractRequestCommand跟RequestCommand的实现关系并画成依赖关系,请大家理解成实现关系,以后类图同样这样理解(除非接口与接口或者类与类关系使用依赖箭头一定是描述成依赖的)。
所有的请求的命令均继承AbstractRequestCommand类并实现RequestCommand接口,RequestCommand是Gecko框架定义的接口,所有的命令均在编码后能被Gecko框架组织传输,传输协议是透明的文本协议。AbstractRequestCommand定义了基本属性topic和opaque。
- public abstract class AbstractRequestCommand implements RequestCommand, MetaEncodeCommand {
- private Integer opaque; //主要用于标识请求,响应的时候该标识被带回,用于客户端区分是哪个请求的响应
- private String topic;
- }
public abstract class AbstractRequestCommand implements RequestCommand, MetaEncodeCommand { private Integer opaque; //主要用于标识请求,响应的时候该标识被带回,用于客户端区分是哪个请求的响应 private String topic; }
响应命令的类图如下:
请求命令只分为两类,带有消息集合的响应(DataCommand);带有其他结果的响应(BooleanCommand)。DataCommand里携带的消息的格式与消息的存储结构一直,这样可以提高Broker的处理能力,将消息解析、正确性等验证放在Client,充分发挥Client的计算能力。这里比较麻烦的一点就是其他结果的响应均有BooleanCommand完成,BooleanCommand中只有code和message熟悉,code用来返回响应状态码,比如统计结果的信息的携带就必须由message熟悉来完成,所以结果的响应能力有限,而且必须先转换成字符串,具体如下:
- /**
- * 应答命令,协议格式如下:</br> result code length opaque\r\n message
- */
- public class BooleanCommand extends AbstractResponseCommand implements BooleanAckCommand {
- private String message;
- /**
- * status code in http protocol
- */
- private final int code;//响应的状态码
- public BooleanCommand(final int code, final String message, final Integer opaque) {
- super(opaque);
- this.code = code;
- switch (this.code) {
- case HttpStatus.Success:
- this.setResponseStatus(ResponseStatus.NO_ERROR);
- break;
- default:
- this.setResponseStatus(ResponseStatus.ERROR);
- break;
- }
- this.message = message;
- }
- public String getErrorMsg() {
- return this.message;
- }
- public int getCode() {
- return this.code;
- }
- public void setErrorMsg(final String errorMsg) {
- this.message = errorMsg;
- }
- public IoBuffer encode() {
- //对结果进行编码,以便能在网络上传输
- final byte[] bytes = ByteUtils.getBytes(this.message);
- final int messageLen = bytes == null ? 0 : bytes.length;
- final IoBuffer buffer = IoBuffer.allocate(11 + ByteUtils.stringSize(this.code)
- + ByteUtils.stringSize(this.getOpaque()) + ByteUtils.stringSize(messageLen) + messageLen);
- ByteUtils.setArguments(buffer, MetaEncodeCommand.RESULT_CMD, this.code, messageLen, this.getOpaque());
- if (bytes != null) {
- buffer.put(bytes);
- }
- buffer.flip();
- return buffer;
- }
- }
/** * 应答命令,协议格式如下:</br> result code length opaque\r\n message */public class BooleanCommand extends AbstractResponseCommand implements BooleanAckCommand { private String message; /** * status code in http protocol */ private final int code;//响应的状态码 public BooleanCommand(final int code, final String message, final Integer opaque) { super(opaque); this.code = code; switch (this.code) { case HttpStatus.Success: this.setResponseStatus(ResponseStatus.NO_ERROR); break; default: this.setResponseStatus(ResponseStatus.ERROR); break; } this.message = message; } public String getErrorMsg() { return this.message; } public int getCode() { return this.code; } public void setErrorMsg(final String errorMsg) { this.message = errorMsg; } public IoBuffer encode() {//对结果进行编码,以便能在网络上传输 final byte[] bytes = ByteUtils.getBytes(this.message); final int messageLen = bytes == null ? 0 : bytes.length; final IoBuffer buffer = IoBuffer.allocate(11 + ByteUtils.stringSize(this.code) + ByteUtils.stringSize(this.getOpaque()) + ByteUtils.stringSize(messageLen) + messageLen); ByteUtils.setArguments(buffer, MetaEncodeCommand.RESULT_CMD, this.code, messageLen, this.getOpaque()); if (bytes != null) { buffer.put(bytes); } buffer.flip(); return buffer; }}
前面讲到的每个请求都会携带一个属性opaque并且该opaque将会在响应里被带回, AbstractResponseCommand里定义了该被带回的属性opaque。
- /**
- * 应答命令基类
- */
- public abstract class AbstractResponseCommand implements ResponseCommand, MetaEncodeCommand {
- private Integer opaque;
- private InetSocketAddress responseHost; // responseHost和responseTime尚未发现在哪来调用,预计是作者预留的属性
- private long responseTime;
- private ResponseStatus responseStatus; //响应状态
- }
/** * 应答命令基类 */public abstract class AbstractResponseCommand implements ResponseCommand, MetaEncodeCommand { private Integer opaque; private InetSocketAddress responseHost; // responseHost和responseTime尚未发现在哪来调用,预计是作者预留的属性 private long responseTime; private ResponseStatus responseStatus; //响应状态}
不知道研究过MetaQ源码的朋友们发现DataCommand的encode()是一个空实现没,虽然作者有注释,运行Broker确实不存在问题,但就从设计者的角度来看,还是有些小问题的,代码如下:
- public class DataCommand extends AbstractResponseCommand {
- private final byte[] data;
- ……
- @Override
- public IoBuffer encode() {
- //作者注释: 不做任何事情,发送data command由transferTo替代
- //笔者注释:因为Borker采用了BrokerCommandProcessor 中zeroCopy的机制,所以不会该encode方法的调用,但如果以后改动后,允许zeroCopy变成可配置项,如果该方法不实现,就会出现解析问题,因为配置容易加上,但容易忘记该处的实现。
- return null;
- }
- }
public class DataCommand extends AbstractResponseCommand { private final byte[] data;…… @Override public IoBuffer encode() { //作者注释: 不做任何事情,发送data command由transferTo替代 //笔者注释:因为Borker采用了BrokerCommandProcessor 中zeroCopy的机制,所以不会该encode方法的调用,但如果以后改动后,允许zeroCopy变成可配置项,如果该方法不实现,就会出现解析问题,因为配置容易加上,但容易忘记该处的实现。 return null; }}
上一篇: MetaQ技术内幕——源码分析(七)
下一篇: MetaQ技术内幕——源码分析(五)
推荐阅读
-
SpringBoot 源码解析 (六)----- Spring Boot的核心能力 - 内置Servlet容器源码分析(Tomcat)
-
Mybaits 源码解析 (六)----- 全网最详细:Select 语句的执行过程分析(上篇)(Mapper方法是如何调用到XML中的SQL的?)
-
Tomcat源码分析 (六)----- Tomcat 启动过程(一)
-
Javac编译原理 《深入分析java web 技术内幕》第四章
-
Android源码分析(六)-----蓝牙Bluetooth源码目录分析
-
[Abp 源码分析]六、工作单元的实现
-
vuex 源码分析(六) 辅助函数 详解
-
PHP模糊查询技术实例分析【附源码下载】
-
《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码...
-
《深入分析Java Web技术内幕》读书笔记