欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MetaQ技术内幕——源码分析(七)

程序员文章站 2022-07-13 13:48:51
...

前面介绍了Broker在网络传输过程中使用的数据结构,同时也介绍了MetaQ使用了Gecko框架作为网络传输框架。

有人会问,Gecko什么调用MetaEncodeCommand的encode()方法,让命令变成可见的明文在网络传输,Gecko又在什么时候将网络传输的数据包装成一个个Command对象?

或许有人已经注意到了笔者在介绍Broker启动类MetaMorphosisBroker的时候估计漏掉了一个方法newRemotingServer()方法,即创建Gecko Server。

Java代码 MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ 
  1. private static RemotingServer newRemotingServer(final MetaConfig metaConfig) { 
  2.         final ServerConfig serverConfig = new ServerConfig(); 
  3.         serverConfig.setWireFormatType(new MetamorphosisWireFormatType()); //注册了MetamorphosisWireFormatType实例,该实例负责编码和解码Command 
  4.         serverConfig.setPort(metaConfig.getServerPort()); 
  5.         final RemotingServer server = RemotingFactory.newRemotingServer(serverConfig); 
  6.         return server; 
private static RemotingServer newRemotingServer(final MetaConfig metaConfig) {		final ServerConfig serverConfig = new ServerConfig();		serverConfig.setWireFormatType(new MetamorphosisWireFormatType()); //注册了MetamorphosisWireFormatType实例,该实例负责编码和解码Command		serverConfig.setPort(metaConfig.getServerPort());		final RemotingServer server = RemotingFactory.newRemotingServer(serverConfig);		return server;}

在该方法内注册了一个MetamorphosisWireFormatType实例,该实例负责Command 的编码解码工作,MetamorphosisWireFormatType实现接口WireFormatType。

Java代码 MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ 
  1. public class MetamorphosisWireFormatType extends WireFormatType { 
  2.     public static final String SCHEME = "meta"; 
  3.  
  4.     public String getScheme() { 
  5.         return SCHEME; 
  6.     } 
  7.  
  8.     public String name() { 
  9.         return "metamorphosis"; 
  10.     } 
  11.  
  12.     public CodecFactory newCodecFactory() { 
  13.         return new MetaCodecFactory(); 
  14.     } 
  15.  
  16.     public CommandFactory newCommandFactory() { 
  17.         return new MetaCommandFactory(); 
  18.     } 
public class MetamorphosisWireFormatType extends WireFormatType {	public static final String SCHEME = "meta";	public String getScheme() {		return SCHEME;	}	public String name() {		return "metamorphosis";	}	public CodecFactory newCodecFactory() {		return new MetaCodecFactory();	}	public CommandFactory newCommandFactory() {		return new MetaCommandFactory();	}

MetamorphosisWireFormatType本身并没有进行编码解码,而是交给了类MetaCodecFactory去实现,另外我们也看到newCommandFactory()方法,该方法主要是用于连接的心跳检测。下面让我们分别来看看这两个类: MetaCommandFactory和MetaCodecFactory,MetaCommandFactory和MetaCodecFactory均是MetamorphosisWireFormatType的内部类

用于心跳检测的类MetaCommandFactory,该类主要有两个方法,创建心跳请求的createHeartBeatCommand()方法和响应心跳请求的createBooleanAckCommand()方法:

Java代码 MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ 
  1. static class MetaCommandFactory implements CommandFactory { 
  2.  
  3.         public BooleanAckCommand createBooleanAckCommand(final CommandHeader request, final ResponseStatus responseStatus, final String errorMsg) { 
  4. //响应心跳请求 
  5.             int httpCode = -1; 
  6.             switch (responseStatus) { 
  7.                 case NO_ERROR: 
  8.                     httpCode = HttpStatus.Success; 
  9.                     break; 
  10.                 case THREADPOOL_BUSY: 
  11.                 case NO_PROCESSOR: 
  12.                     httpCode = HttpStatus.ServiceUnavilable; 
  13.                     break; 
  14.                 case TIMEOUT: 
  15.                     httpCode = HttpStatus.GatewayTimeout; 
  16.                     break; 
  17.                 default: 
  18.                     httpCode = HttpStatus.InternalServerError; 
  19.                     break; 
  20.             } 
  21.             return new BooleanCommand(httpCode, errorMsg, request.getOpaque()); 
  22.         } 
  23.  
  24.         public HeartBeatRequestCommand createHeartBeatCommand() { 
  25. //前面介绍过VersionCommand用于心跳检测,就是用于此处 
  26.             return new VersionCommand(OpaqueGenerator.getNextOpaque()); 
  27.         } 
  28.     } 
static class MetaCommandFactory implements CommandFactory {		public BooleanAckCommand createBooleanAckCommand(final CommandHeader request, final ResponseStatus responseStatus, final String errorMsg) {//响应心跳请求			int httpCode = -1;			switch (responseStatus) {				case NO_ERROR:					httpCode = HttpStatus.Success;					break;				case THREADPOOL_BUSY:				case NO_PROCESSOR:					httpCode = HttpStatus.ServiceUnavilable;					break;				case TIMEOUT:					httpCode = HttpStatus.GatewayTimeout;					break;				default:					httpCode = HttpStatus.InternalServerError;					break;			}			return new BooleanCommand(httpCode, errorMsg, request.getOpaque());		}		public HeartBeatRequestCommand createHeartBeatCommand() {//前面介绍过VersionCommand用于心跳检测,就是用于此处			return new VersionCommand(OpaqueGenerator.getNextOpaque());		}	}

MetaCodecFactory是MetaQ(包括Broker和Client,因为编码解码Broker和Client都需要)网络传输最重要的一个类,负责命令的编码解码,MetaCodecFactory要实现Gecko框架定义的接口CodecFactory,MetaCodecFactory实例才能被Gecko框架使用,接口CodecFactory就定义了两个方法,返回编码器和解码器(由于Client和Broker均需要使用到MetamorphosisWireFormatType,所以MetamorphosisWireFormatType放在common工程中):

Java代码 MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ 
  1. static class MetaCodecFactory implements CodecFactory { 
  2.     //返回解码器 
  3.         @Override 
  4.         public Decoder getDecoder() { 
  5.  
  6.             return new Decoder() { 
  7.                 //Gecko框架会在适当的时候调用该方法,并将数据放到参数buff中, 
  8.                 //用户可以根据buff的内容进行解析,包装成对应的Command类型 
  9.                 public Object decode(final IoBuffer buff, final Session session) { 
  10.                     if (buff == null || !buff.hasRemaining()) { 
  11.                         return null; 
  12.                     } 
  13.                     buff.mark(); 
  14.                      //匹配第一个{‘\r’, ‘\n’},也就是找到命令的内容(不包括数据),目前只有PutCommand和SynCommand有数据部分,其他的命令都只有命令的内容 
  15.                     final int index = LINE_MATCHER.matchFirst(buff); 
  16.                     if (index >= 0) { 
  17.                           //获取命令内容 
  18.                         final byte[] bytes = new byte[index - buff.position()]; 
  19.                         buff.get(bytes); 
  20.                         //跳过\r\n 
  21.                         buff.position(buff.position() + 2); 
  22.                           //将命令字节数组转换成字符串 
  23.                         final String line = ByteUtils.getString(bytes); 
  24.                         if (log.isDebugEnabled()) { 
  25.                             log.debug("Receive command:" + line); 
  26.                         } 
  27.                           //以空格为单位分离内容 
  28.                         final String[] sa = SPLITER.split(line); 
  29.                         if (sa == null || sa.length == 0) { 
  30.                             throw new MetaCodecException("Blank command line."); 
  31.                         } 
  32.                           //判断内容的第一个字母 
  33.                         final byte op = (byte) sa[0].charAt(0); 
  34.                         switch (op) { 
  35.                             case 'p': 
  36.                                     //如果是p的话,认为是put命令,具体见MetaEncodeCommand定义的命令的内容并解析put命令,具体格式在每个命令的实现类里的注释都有,下面的各个方法的注释也有部分 
  37.                                 return this.decodePut(buff, sa); 
  38.                             case 'g': 
  39.                                    //如果是g的话,认为是get命令 
  40.                                 return this.decodeGet(sa); 
  41.                             case 't': 
  42.                                    //如果是g的话,认为是事务命令 
  43.                                 return this.decodeTransaction(sa); 
  44.                             case 'r': 
  45.                                    //如果是g的话,认为是结果响应 
  46.                                 return this.decodeBoolean(buff, sa); 
  47.                             case 'v': 
  48.                                      //如果是v的话,则可能是心跳请求或者数据响应,所以得使用更详细的信息进行判断 
  49.                                 if (sa[0].equals("value")) { 
  50.                                     return this.decodeData(buff, sa); 
  51.                                 } else { 
  52.                                     return this.decodeVersion(sa); 
  53.                                 } 
  54.                             case 's': 
  55.                                //如果是s的话,则可能是统计请求或者同步,所以得使用更详细的信息进行判断 
  56. if (sa[0].equals("stats")) { 
  57.                                     return this.decodeStats(sa); 
  58.                                 } else { 
  59.                                     return this.decodeSync(buff, sa); 
  60.                                 } 
  61.                             case 'o': 
  62.                                   //如果是o的话,查询最近可用位置请求 
  63.                                 return this.decodeOffset(sa); 
  64.                             case 'q': 
  65.                                    //如果是q的话,退出连接请求 
  66.                                 return this.decodeQuit(); 
  67.                             default: 
  68.                                 throw new MetaCodecException("Unknow command:" + line); 
  69.                         } 
  70.                     } else { 
  71.                         return null; 
  72.                     } 
  73.                 } 
  74.  
  75.                 private Object decodeQuit() { 
  76.                     return new QuitCommand(); 
  77.                 } 
  78.  
  79.                 private Object decodeVersion(final String[] sa) { 
  80.                     if (sa.length >= 2) { 
  81.                         return new VersionCommand(Integer.parseInt(sa[1])); 
  82.                     } else { 
  83.                         return new VersionCommand(Integer.MAX_VALUE); 
  84.                     } 
  85.                 } 
  86.  
  87.                 // offset topic group partition offset opaque\r\n 
  88.                 private Object decodeOffset(final String[] sa) { 
  89.                     this.assertCommand(sa[0], "offset"); 
  90.                     return new OffsetCommand(sa[1], sa[2], Integer.parseInt(sa[3]), Long.parseLong(sa[4]), Integer.parseInt(sa[5])); 
  91.                 } 
  92.  
  93.                 // stats item opaque\r\n 
  94.                 // opaque可以为空 
  95.                 private Object decodeStats(final String[] sa) { 
  96.                     this.assertCommand(sa[0], "stats"); 
  97.                     int opaque = Integer.MAX_VALUE; 
  98.                     if (sa.length >= 3) { 
  99.                         opaque = Integer.parseInt(sa[2]); 
  100.                     } 
  101.                     String item = null; 
  102.                     if (sa.length >= 2) { 
  103.                         item = sa[1]; 
  104.                     } 
  105.                     return new StatsCommand(opaque, item); 
  106.                 } 
  107.  
  108.                 // value totalLen opaque\r\n data 
  109.                 private Object decodeData(final IoBuffer buff, final String[] sa) { 
  110.                     this.assertCommand(sa[0], "value"); 
  111.                     final int valueLen = Integer.parseInt(sa[1]); 
  112.                     if (buff.remaining() < valueLen) { 
  113.                         buff.reset(); 
  114.                         return null; 
  115.                     } else { 
  116.                         final byte[] data = new byte[valueLen]; 
  117.                         buff.get(data); 
  118.                         return new DataCommand(data, Integer.parseInt(sa[2])); 
  119.                     } 
  120.                 } 
  121.  
  122.                 /**
  123.                  * result code length opaque\r\n message
  124.                  * 
  125.                  * @param buff
  126.                  * @param sa
  127.                  * @return
  128.                  */ 
  129.                 private Object decodeBoolean(final IoBuffer buff, final String[] sa) { 
  130.                     this.assertCommand(sa[0], "result"); 
  131.                     final int valueLen = Integer.parseInt(sa[2]); 
  132.                     if (valueLen == 0) { 
  133.                         return new BooleanCommand(Integer.parseInt(sa[1]), null, Integer.parseInt(sa[3])); 
  134.                     } else { 
  135.                         if (buff.remaining() < valueLen) { 
  136.                             buff.reset(); 
  137.                             return null; 
  138.                         } else { 
  139.                             final byte[] data = new byte[valueLen]; 
  140.                             buff.get(data); 
  141.                             return new BooleanCommand(Integer.parseInt(sa[1]), ByteUtils.getString(data), Integer.parseInt(sa[3])); 
  142.                         } 
  143.                     } 
  144.                 } 
  145.  
  146.                 // get topic group partition offset maxSize opaque\r\n 
  147.                 private Object decodeGet(final String[] sa) { 
  148.                     this.assertCommand(sa[0], "get"); 
  149.                     return new GetCommand(sa[1], sa[2], Integer.parseInt(sa[3]), Long.parseLong(sa[4]), Integer.parseInt(sa[5]), Integer.parseInt(sa[6])); 
  150.                 } 
  151.  
  152.                 // transaction key sessionId type [timeout] [unique qualifier] 
  153.                 // opaque\r\n 
  154.                 private Object decodeTransaction(final String[] sa) { 
  155.                     this.assertCommand(sa[0], "transaction"); 
  156.                     final TransactionId transactionId = this.getTransactionId(sa[1]); 
  157.                     final TransactionType type = TransactionType.valueOf(sa[3]); 
  158.                     switch (sa.length) { 
  159.                         case 7: 
  160.                             // Both include timeout and unique qualifier. 
  161.                             int timeout = Integer.valueOf(sa[4]); 
  162.                             String uniqueQualifier = sa[5]; 
  163.                             TransactionInfo info = new TransactionInfo(transactionId, sa[2], type, uniqueQualifier, timeout); 
  164.                             return new TransactionCommand(info, Integer.parseInt(sa[6])); 
  165.                         case 6: 
  166.                             // Maybe timeout or unique qualifier 
  167.                             if (StringUtils.isNumeric(sa[4])) { 
  168.                                 timeout = Integer.valueOf(sa[4]); 
  169.                                 info = new TransactionInfo(transactionId, sa[2], type, null, timeout); 
  170.                                 return new TransactionCommand(info, Integer.parseInt(sa[5])); 
  171.                             } else { 
  172.                                 uniqueQualifier = sa[4]; 
  173.                                 info = new TransactionInfo(transactionId, sa[2], type, uniqueQualifier, 0); 
  174.                                 return new TransactionCommand(info, Integer.parseInt(sa[5])); 
  175.                             } 
  176.                         case 5: 
  177.                             // Without timeout and unique qualifier. 
  178.                             info = new TransactionInfo(transactionId, sa[2], type, null); 
  179.                             return new TransactionCommand(info, Integer.parseInt(sa[4])); 
  180.                         default: 
  181.                             throw new MetaCodecException("Invalid transaction command:" + StringUtils.join(sa)); 
  182.                     } 
  183.                 } 
  184.  
  185.                 private TransactionId getTransactionId(final String s) { 
  186.                     return TransactionId.valueOf(s); 
  187.                 } 
  188.  
  189.                 // sync topic partition value-length flag msgId 
  190.                 // opaque\r\n 
  191.                 private Object decodeSync(final IoBuffer buff, final String[] sa) { 
  192.                     this.assertCommand(sa[0], "sync"); 
  193.                     final int valueLen = Integer.parseInt(sa[3]); 
  194.                     if (buff.remaining() < valueLen) { 
  195.                         buff.reset(); 
  196.                         return null; 
  197.                     } else { 
  198.                         final byte[] data = new byte[valueLen]; 
  199.                         buff.get(data); 
  200.                         switch (sa.length) { 
  201.                             case 7: 
  202.                                 // old master before 1.4.4 
  203.                                 return new SyncCommand(sa[1], Integer.parseInt(sa[2]), data, Integer.parseInt(sa[4]), Long.valueOf(sa[5]), -1, Integer.parseInt(sa[6])); 
  204.                             case 8: 
  205.                                 // new master since 1.4.4 
  206.                                 return new SyncCommand(sa[1], Integer.parseInt(sa[2]), data, Integer.parseInt(sa[4]), Long.valueOf(sa[5]), Integer.parseInt(sa[6]), Integer.parseInt(sa[7])); 
  207.                             default: 
  208.                                 throw new MetaCodecException("Invalid Sync command:" + StringUtils.join(sa)); 
  209.                         } 
  210.                     } 
  211.                 } 
  212.  
  213.                 // put topic partition value-length flag checksum 
  214.                 // [transactionKey] 
  215.                 // opaque\r\n 
  216.                 private Object decodePut(final IoBuffer buff, final String[] sa) { 
  217.                     this.assertCommand(sa[0], "put"); 
  218.                     final int valueLen = Integer.parseInt(sa[3]); 
  219.                     if (buff.remaining() < valueLen) { 
  220.                         buff.reset(); 
  221.                         return null; 
  222.                     } else { 
  223.                         final byte[] data = new byte[valueLen]; 
  224.                         buff.get(data); 
  225.                         switch (sa.length) { 
  226.                             case 6: 
  227.                                 // old clients before 1.4.4 
  228.                                 return new PutCommand(sa[1], Integer.parseInt(sa[2]), data, null, Integer.parseInt(sa[4]), Integer.parseInt(sa[5])); 
  229.                             case 7: 
  230.                                 // either transaction command or new clients since 
  231.                                 // 1.4.4 
  232.                                 String slot = sa[5]; 
  233.                                 char firstChar = slot.charAt(0); 
  234.                                 if (Character.isDigit(firstChar) || '-' == firstChar) { 
  235.                                     // slot is checksum. 
  236.                                     int checkSum = Integer.parseInt(slot); 
  237.                                     return new PutCommand(sa[1], Integer.parseInt(sa[2]), data, Integer.parseInt(sa[4]), checkSum, null, Integer.parseInt(sa[6])); 
  238.                                 } else { 
  239.                                     // slot is transaction id. 
  240.                                     return new PutCommand(sa[1], Integer.parseInt(sa[2]), data, this.getTransactionId(slot), Integer.parseInt(sa[4]), Integer.parseInt(sa[6])); 
  241.                                 } 
  242.                             case 8: 
  243.                                 // New clients since 1.4.4 
  244.                                 // A transaction command 
  245.                                 return new PutCommand(sa[1], Integer.parseInt(sa[2]), data, Integer.parseInt(sa[4]), Integer.parseInt(sa[5]), this.getTransactionId(sa[6]), Integer.parseInt(sa[7])); 
  246.                             default: 
  247.                                 throw new MetaCodecException("Invalid put command:" + StringUtils.join(sa)); 
  248.                         } 
  249.                     } 
  250.                 } 
  251.  
  252.                 private void assertCommand(final String cmd, final String expect) { 
  253.                     if (!expect.equals(cmd)) { 
  254.                         throw new MetaCodecException("Expect " + expect + ",but was " + cmd); 
  255.                     } 
  256.                 } 
  257.             }; 
  258.         } 
  259.  
  260.         @Override 
  261.         public Encoder getEncoder() { 
  262.             //返回编码器 
  263. return new Encoder() { 
  264.                 @Override 
  265.                 public IoBuffer encode(final Object message, final Session session) { 
  266.                       //框架会在适当的时候调用编码器的encode()方法,前面说过如果响应的命令是DataCommand的时候假设不是zeroCopy的话,会出现问题。原因就在这里,因为如果不使用zeroCopy的话,返回给Gecko框架的是一个DataCommand的实例,这时候会调用到此方法,而此方法并没有按照DataCommand的格式进行编码,解码器会识别不了,所以容易出问题 
  267.                     return ((MetaEncodeCommand) message).encode(); 
  268.                 } 
  269.             }; 
  270.         } 
static class MetaCodecFactory implements CodecFactory {	//返回解码器		@Override		public Decoder getDecoder() {			return new Decoder() {				//Gecko框架会在适当的时候调用该方法,并将数据放到参数buff中,                //用户可以根据buff的内容进行解析,包装成对应的Command类型				public Object decode(final IoBuffer buff, final Session session) {					if (buff == null || !buff.hasRemaining()) {						return null;					}					buff.mark();                     //匹配第一个{‘\r’, ‘\n’},也就是找到命令的内容(不包括数据),目前只有PutCommand和SynCommand有数据部分,其他的命令都只有命令的内容					final int index = LINE_MATCHER.matchFirst(buff);					if (index >= 0) {                          //获取命令内容						final byte[] bytes = new byte[index - buff.position()];						buff.get(bytes);						//跳过\r\n						buff.position(buff.position() + 2);                          //将命令字节数组转换成字符串						final String line = ByteUtils.getString(bytes);						if (log.isDebugEnabled()) {							log.debug("Receive command:" + line);						}                          //以空格为单位分离内容						final String[] sa = SPLITER.split(line);						if (sa == null || sa.length == 0) {							throw new MetaCodecException("Blank command line.");						}                          //判断内容的第一个字母						final byte op = (byte) sa[0].charAt(0);						switch (op) {							case 'p':                                    //如果是p的话,认为是put命令,具体见MetaEncodeCommand定义的命令的内容并解析put命令,具体格式在每个命令的实现类里的注释都有,下面的各个方法的注释也有部分								return this.decodePut(buff, sa);							case 'g':                                   //如果是g的话,认为是get命令								return this.decodeGet(sa);							case 't':                                   //如果是g的话,认为是事务命令								return this.decodeTransaction(sa);							case 'r':                                   //如果是g的话,认为是结果响应								return this.decodeBoolean(buff, sa);							case 'v':                                     //如果是v的话,则可能是心跳请求或者数据响应,所以得使用更详细的信息进行判断								if (sa[0].equals("value")) {									return this.decodeData(buff, sa);								} else {									return this.decodeVersion(sa);								}							case 's':							   //如果是s的话,则可能是统计请求或者同步,所以得使用更详细的信息进行判断if (sa[0].equals("stats")) {									return this.decodeStats(sa);								} else {									return this.decodeSync(buff, sa);								}							case 'o':                                  //如果是o的话,查询最近可用位置请求								return this.decodeOffset(sa);							case 'q':                                   //如果是q的话,退出连接请求								return this.decodeQuit();							default:								throw new MetaCodecException("Unknow command:" + line);						}					} else {						return null;					}				}				private Object decodeQuit() {					return new QuitCommand();				}				private Object decodeVersion(final String[] sa) {					if (sa.length >= 2) {						return new VersionCommand(Integer.parseInt(sa[1]));					} else {						return new VersionCommand(Integer.MAX_VALUE);					}				}				// offset topic group partition offset opaque\r\n				private Object decodeOffset(final String[] sa) {					this.assertCommand(sa[0], "offset");					return new OffsetCommand(sa[1], sa[2], Integer.parseInt(sa[3]), Long.parseLong(sa[4]), Integer.parseInt(sa[5]));				}				// stats item opaque\r\n				// opaque可以为空				private Object decodeStats(final String[] sa) {					this.assertCommand(sa[0], "stats");					int opaque = Integer.MAX_VALUE;					if (sa.length >= 3) {						opaque = Integer.parseInt(sa[2]);					}					String item = null;					if (sa.length >= 2) {						item = sa[1];					}					return new StatsCommand(opaque, item);				}				// value totalLen opaque\r\n data				private Object decodeData(final IoBuffer buff, final String[] sa) {					this.assertCommand(sa[0], "value");					final int valueLen = Integer.parseInt(sa[1]);					if (buff.remaining() < valueLen) {						buff.reset();						return null;					} else {						final byte[] data = new byte[valueLen];						buff.get(data);						return new DataCommand(data, Integer.parseInt(sa[2]));					}				}				/**				 * result code length opaque\r\n message				 * 				 * @param buff				 * @param sa				 * @return				 */				private Object decodeBoolean(final IoBuffer buff, final String[] sa) {					this.assertCommand(sa[0], "result");					final int valueLen = Integer.parseInt(sa[2]);					if (valueLen == 0) {						return new BooleanCommand(Integer.parseInt(sa[1]), null, Integer.parseInt(sa[3]));					} else {						if (buff.remaining() < valueLen) {							buff.reset();							return null;						} else {							final byte[] data = new byte[valueLen];							buff.get(data);							return new BooleanCommand(Integer.parseInt(sa[1]), ByteUtils.getString(data), Integer.parseInt(sa[3]));						}					}				}				// get topic group partition offset maxSize opaque\r\n				private Object decodeGet(final String[] sa) {					this.assertCommand(sa[0], "get");					return new GetCommand(sa[1], sa[2], Integer.parseInt(sa[3]), Long.parseLong(sa[4]), Integer.parseInt(sa[5]), Integer.parseInt(sa[6]));				}				// transaction key sessionId type [timeout] [unique qualifier]				// opaque\r\n				private Object decodeTransaction(final String[] sa) {					this.assertCommand(sa[0], "transaction");					final TransactionId transactionId = this.getTransactionId(sa[1]);					final TransactionType type = TransactionType.valueOf(sa[3]);					switch (sa.length) {						case 7:							// Both include timeout and unique qualifier.							int timeout = Integer.valueOf(sa[4]);							String uniqueQualifier = sa[5];							TransactionInfo info = new TransactionInfo(transactionId, sa[2], type, uniqueQualifier, timeout);							return new TransactionCommand(info, Integer.parseInt(sa[6]));						case 6:							// Maybe timeout or unique qualifier							if (StringUtils.isNumeric(sa[4])) {								timeout = Integer.valueOf(sa[4]);								info = new TransactionInfo(transactionId, sa[2], type, null, timeout);								return new TransactionCommand(info, Integer.parseInt(sa[5]));							} else {								uniqueQualifier = sa[4];								info = new TransactionInfo(transactionId, sa[2], type, uniqueQualifier, 0);								return new TransactionCommand(info, Integer.parseInt(sa[5]));							}						case 5:							// Without timeout and unique qualifier.							info = new TransactionInfo(transactionId, sa[2], type, null);							return new TransactionCommand(info, Integer.parseInt(sa[4]));						default:							throw new MetaCodecException("Invalid transaction command:" + StringUtils.join(sa));					}				}				private TransactionId getTransactionId(final String s) {					return TransactionId.valueOf(s);				}				// sync topic partition value-length flag msgId				// opaque\r\n				private Object decodeSync(final IoBuffer buff, final String[] sa) {					this.assertCommand(sa[0], "sync");					final int valueLen = Integer.parseInt(sa[3]);					if (buff.remaining() < valueLen) {						buff.reset();						return null;					} else {						final byte[] data = new byte[valueLen];						buff.get(data);						switch (sa.length) {							case 7:								// old master before 1.4.4								return new SyncCommand(sa[1], Integer.parseInt(sa[2]), data, Integer.parseInt(sa[4]), Long.valueOf(sa[5]), -1, Integer.parseInt(sa[6]));							case 8:								// new master since 1.4.4								return new SyncCommand(sa[1], Integer.parseInt(sa[2]), data, Integer.parseInt(sa[4]), Long.valueOf(sa[5]), Integer.parseInt(sa[6]), Integer.parseInt(sa[7]));							default:								throw new MetaCodecException("Invalid Sync command:" + StringUtils.join(sa));						}					}				}				// put topic partition value-length flag checksum				// [transactionKey]				// opaque\r\n				private Object decodePut(final IoBuffer buff, final String[] sa) {					this.assertCommand(sa[0], "put");					final int valueLen = Integer.parseInt(sa[3]);					if (buff.remaining() < valueLen) {						buff.reset();						return null;					} else {						final byte[] data = new byte[valueLen];						buff.get(data);						switch (sa.length) {							case 6:								// old clients before 1.4.4								return new PutCommand(sa[1], Integer.parseInt(sa[2]), data, null, Integer.parseInt(sa[4]), Integer.parseInt(sa[5]));							case 7:								// either transaction command or new clients since								// 1.4.4								String slot = sa[5];								char firstChar = slot.charAt(0);								if (Character.isDigit(firstChar) || '-' == firstChar) {									// slot is checksum.									int checkSum = Integer.parseInt(slot);									return new PutCommand(sa[1], Integer.parseInt(sa[2]), data, Integer.parseInt(sa[4]), checkSum, null, Integer.parseInt(sa[6]));								} else {									// slot is transaction id.									return new PutCommand(sa[1], Integer.parseInt(sa[2]), data, this.getTransactionId(slot), Integer.parseInt(sa[4]), Integer.parseInt(sa[6]));								}							case 8:								// New clients since 1.4.4								// A transaction command								return new PutCommand(sa[1], Integer.parseInt(sa[2]), data, Integer.parseInt(sa[4]), Integer.parseInt(sa[5]), this.getTransactionId(sa[6]), Integer.parseInt(sa[7]));							default:								throw new MetaCodecException("Invalid put command:" + StringUtils.join(sa));						}					}				}				private void assertCommand(final String cmd, final String expect) {					if (!expect.equals(cmd)) {						throw new MetaCodecException("Expect " + expect + ",but was " + cmd);					}				}			};		}		@Override		public Encoder getEncoder() {			//返回编码器return new Encoder() {				@Override				public IoBuffer encode(final Object message, final Session session) {                      //框架会在适当的时候调用编码器的encode()方法,前面说过如果响应的命令是DataCommand的时候假设不是zeroCopy的话,会出现问题。原因就在这里,因为如果不使用zeroCopy的话,返回给Gecko框架的是一个DataCommand的实例,这时候会调用到此方法,而此方法并没有按照DataCommand的格式进行编码,解码器会识别不了,所以容易出问题					return ((MetaEncodeCommand) message).encode();				}			};		}

前面还介绍到过MetaMorphosisBroker在启动时会注册请求类型与Processor的映射,见代码:

Java代码 MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ 
  1. private void registerProcessors() { 
  2.         this.remotingServer.registerProcessor(GetCommand.class, new GetProcessor(this.brokerProcessor, this.executorsManager.getGetExecutor())); 
  3.         this.remotingServer.registerProcessor(PutCommand.class, new PutProcessor(this.brokerProcessor, this.executorsManager.getUnOrderedPutExecutor())); 
  4.         this.remotingServer.registerProcessor(OffsetCommand.class, new OffsetProcessor(this.brokerProcessor, this.executorsManager.getGetExecutor())); 
  5.         this.remotingServer.registerProcessor(HeartBeatRequestCommand.class, new VersionProcessor(this.brokerProcessor)); 
  6.         this.remotingServer.registerProcessor(QuitCommand.class, new QuitProcessor(this.brokerProcessor)); 
  7.         this.remotingServer.registerProcessor(StatsCommand.class, new StatsProcessor(this.brokerProcessor)); 
  8.         this.remotingServer.registerProcessor(TransactionCommand.class, new TransactionProcessor(this.brokerProcessor, this.executorsManager.getUnOrderedPutExecutor())); 
private void registerProcessors() {		this.remotingServer.registerProcessor(GetCommand.class, new GetProcessor(this.brokerProcessor, this.executorsManager.getGetExecutor()));		this.remotingServer.registerProcessor(PutCommand.class, new PutProcessor(this.brokerProcessor, this.executorsManager.getUnOrderedPutExecutor()));		this.remotingServer.registerProcessor(OffsetCommand.class, new OffsetProcessor(this.brokerProcessor, this.executorsManager.getGetExecutor()));		this.remotingServer.registerProcessor(HeartBeatRequestCommand.class, new VersionProcessor(this.brokerProcessor));		this.remotingServer.registerProcessor(QuitCommand.class, new QuitProcessor(this.brokerProcessor));		this.remotingServer.registerProcessor(StatsCommand.class, new StatsProcessor(this.brokerProcessor));		this.remotingServer.registerProcessor(TransactionCommand.class, new TransactionProcessor(this.brokerProcessor, this.executorsManager.getUnOrderedPutExecutor()));}

依据注册的类型,Gecko框架将会根据解析出来的命令实例调用处理器的不同方法,并返回不同请求的响应。下面让我们来看看不同的处理到底做了些什么事情?因为是Broker针对请求的处理,所以所有的Processor都在server工程中,先上类图:


MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ 

所有的处理器均实现了RequestProcessor接口,该接口由Gecko框架定义,RequestProcessor类中只定义了两个方法:

Java代码 MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ  MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ 
  1. public interface RequestProcessor<T extends RequestCommand> { 
  2.     /**
  3.      * 处理请求
  4.      * 
  5.      * @param request请求命令
  6.      * @param conn 请求来源的连接
  7.      */ 
  8.     public void handleRequest(T request, Connection conn); 
  9.  
  10.  
  11.     /**
  12.      * 用户自定义的线程池,如果提供,那么请求的处理都将在该线程池内执行
  13.      * 
  14.      * @return
  15.      */ 
  16.     public ThreadPoolExecutor getExecutor(); 
public interface RequestProcessor<T extends RequestCommand> {    /**     * 处理请求     *      * @param request请求命令     * @param conn 请求来源的连接     */    public void handleRequest(T request, Connection conn);    /**     * 用户自定义的线程池,如果提供,那么请求的处理都将在该线程池内执行     *      * @return     */    public ThreadPoolExecutor getExecutor();}

所以,加上上一篇文章,我们可以得出MetaQ的大致网络处理流程图解如下:


MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ 

 

 

http://soledede.com/

 

大家可以加我个人微信号:scccdgf

 

 

或者关注soledede的微信公众号:soledede
微信公众号:
MetaQ技术内幕——源码分析(七)
            
    
    博客分类: MetaQ MetaQ 
相关标签: MetaQ