thrift-TBinaryProtocol
程序员文章站
2022-06-17 11:54:40
...
TBinaryProtocol 工厂类。
public static class Factory implements TProtocolFactory {
protected long stringLengthLimit_;
protected long containerLengthLimit_;
protected boolean strictRead_;
protected boolean strictWrite_;
public Factory() {
this(false, true);
}
public Factory(boolean strictRead, boolean strictWrite) {
this(strictRead, strictWrite, NO_LENGTH_LIMIT, NO_LENGTH_LIMIT);
}
public Factory(boolean strictRead, boolean strictWrite, long stringLengthLimit, long containerLengthLimit) {
stringLengthLimit_ = stringLengthLimit;
containerLengthLimit_ = containerLengthLimit;
strictRead_ = strictRead;
strictWrite_ = strictWrite;
}
public TProtocol getProtocol(TTransport trans) {
return new TBinaryProtocol(trans, stringLengthLimit_, containerLengthLimit_, strictRead_, strictWrite_);
}
}
构造方法:
public TBinaryProtocol(TTransport trans) {
this(trans, false, true);
}
public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) {
this(trans, NO_LENGTH_LIMIT, NO_LENGTH_LIMIT, strictRead, strictWrite);
}
public TBinaryProtocol(TTransport trans, long stringLengthLimit, long containerLengthLimit, boolean strictRead, boolean strictWrite) {
super(trans);
stringLengthLimit_ = stringLengthLimit;
containerLengthLimit_ = containerLengthLimit;
strictRead_ = strictRead;
strictWrite_ = strictWrite;
}
public class TBinaryProtocol extends TProtocol {
private static final TStruct ANONYMOUS_STRUCT = new TStruct();
private static final long NO_LENGTH_LIMIT = -1;
protected static final int VERSION_MASK = 0xffff0000;
protected static final int VERSION_1 = 0x80010000;
/**
* The maximum number of bytes to read from the transport for
* variable-length fields (such as strings or binary) or {@link #NO_LENGTH_LIMIT} for
* unlimited.
*/
private final long stringLengthLimit_;
/**
* The maximum number of elements to read from the network for
* containers (maps, sets, lists), or {@link #NO_LENGTH_LIMIT} for unlimited.
*/
private final long containerLengthLimit_;
protected boolean strictRead_;
protected boolean strictWrite_;
public void writeMessageBegin(TMessage message) throws TException {
if (strictWrite_) {
int version = VERSION_1 | message.type;
writeI32(version);
writeString(message.name);
writeI32(message.seqid);
} else {
writeString(message.name);
writeByte(message.type);
writeI32(message.seqid);
}
}
public TMessage readMessageBegin() throws TException {
int size = readI32();
if (size < 0) {
int version = size & VERSION_MASK;
if (version != VERSION_1) {
throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
}
return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());
} else {
if (strictRead_) {
throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
}
return new TMessage(readStringBody(size), readByte(), readI32());
}
}
// 写了1个字节的数据类型,和2个字节的顺序号
public void writeFieldBegin(TField field) throws TException {
writeByte(field.type);
writeI16(field.id);
}
// 先读1个字节的数据类型,再读2个字节的顺序号
public TField readFieldBegin() throws TException {
byte type = readByte();
short id = type == TType.STOP ? 0 : readI16();
return new TField("", type, id);
}
public void writeMapBegin(TMap map) throws TException {
writeByte(map.keyType);
writeByte(map.valueType);
writeI32(map.size);
}
public TMap readMapBegin() throws TException {
TMap map = new TMap(readByte(), readByte(), readI32());
checkContainerReadLength(map.size);
return map;
}
public void writeListBegin(TList list) throws TException {
writeByte(list.elemType);
writeI32(list.size);
}
public TList readListBegin() throws TException {
TList list = new TList(readByte(), readI32());
checkContainerReadLength(list.size);
return list;
}
public void writeSetBegin(TSet set) throws TException {
writeByte(set.elemType);
writeI32(set.size);
}
public TSet readSetBegin() throws TException {
TSet set = new TSet(readByte(), readI32());
checkContainerReadLength(set.size);
return set;
}
public void writeBool(boolean b) throws TException {
writeByte(b ? (byte)1 : (byte)0);
}
public boolean readBool() throws TException {
return (readByte() == 1);
}
private byte [] bout = new byte[1];
public void writeByte(byte b) throws TException {
bout[0] = b;
trans_.write(bout, 0, 1);
}
private byte[] bin = new byte[1];
public byte readByte() throws TException {
if (trans_.getBytesRemainingInBuffer() >= 1) {
byte b = trans_.getBuffer()[trans_.getBufferPosition()];
trans_.consumeBuffer(1);
return b;
}
readAll(bin, 0, 1);
return bin[0];
}
private byte[] i16out = new byte[2];
public void writeI16(short i16) throws TException {
i16out[0] = (byte)(0xff & (i16 >> 8));
i16out[1] = (byte)(0xff & (i16));
trans_.write(i16out, 0, 2);
}
private byte[] i16rd = new byte[2];
public short readI16() throws TException {
byte[] buf = i16rd;
int off = 0;
if (trans_.getBytesRemainingInBuffer() >= 2) {
buf = trans_.getBuffer();
off = trans_.getBufferPosition();
trans_.consumeBuffer(2);
} else {
readAll(i16rd, 0, 2);
}
return
(short)
(((buf[off] & 0xff) << 8) |
((buf[off+1] & 0xff)));
}
private byte[] i32out = new byte[4];
public void writeI32(int i32) throws TException {
i32out[0] = (byte)(0xff & (i32 >> 24));
i32out[1] = (byte)(0xff & (i32 >> 16));
i32out[2] = (byte)(0xff & (i32 >> 8));
i32out[3] = (byte)(0xff & (i32));
trans_.write(i32out, 0, 4);
}
private byte[] i32rd = new byte[4];
public int readI32() throws TException {
byte[] buf = i32rd;
int off = 0;
if (trans_.getBytesRemainingInBuffer() >= 4) {
buf = trans_.getBuffer();
off = trans_.getBufferPosition();
trans_.consumeBuffer(4);
} else {
readAll(i32rd, 0, 4);
}
return
((buf[off] & 0xff) << 24) |
((buf[off+1] & 0xff) << 16) |
((buf[off+2] & 0xff) << 8) |
((buf[off+3] & 0xff));
}
private byte[] i64out = new byte[8];
public void writeI64(long i64) throws TException {
i64out[0] = (byte)(0xff & (i64 >> 56));
i64out[1] = (byte)(0xff & (i64 >> 48));
i64out[2] = (byte)(0xff & (i64 >> 40));
i64out[3] = (byte)(0xff & (i64 >> 32));
i64out[4] = (byte)(0xff & (i64 >> 24));
i64out[5] = (byte)(0xff & (i64 >> 16));
i64out[6] = (byte)(0xff & (i64 >> 8));
i64out[7] = (byte)(0xff & (i64));
trans_.write(i64out, 0, 8);
}
private byte[] i64rd = new byte[8];
public long readI64() throws TException {
byte[] buf = i64rd;
int off = 0;
if (trans_.getBytesRemainingInBuffer() >= 8) {
buf = trans_.getBuffer();
off = trans_.getBufferPosition();
trans_.consumeBuffer(8);
} else {
readAll(i64rd, 0, 8);
}
return
((long)(buf[off] & 0xff) << 56) |
((long)(buf[off+1] & 0xff) << 48) |
((long)(buf[off+2] & 0xff) << 40) |
((long)(buf[off+3] & 0xff) << 32) |
((long)(buf[off+4] & 0xff) << 24) |
((long)(buf[off+5] & 0xff) << 16) |
((long)(buf[off+6] & 0xff) << 8) |
((long)(buf[off+7] & 0xff));
}
public void writeDouble(double dub) throws TException {
writeI64(Double.doubleToLongBits(dub));
}
public double readDouble() throws TException {
return Double.longBitsToDouble(readI64());
}
// 先写4字节的字符串长度,再写 UTF-8 编码的字符串
public void writeString(String str) throws TException {
try {
byte[] dat = str.getBytes("UTF-8");
writeI32(dat.length);
trans_.write(dat, 0, dat.length);
} catch (UnsupportedEncodingException uex) {
throw new TException("JVM DOES NOT SUPPORT UTF-8");
}
}
public String readString() throws TException {
int size = readI32();
checkStringReadLength(size);
if (stringLengthLimit_ > 0 && size > stringLengthLimit_) {
throw new TProtocolException(TProtocolException.SIZE_LIMIT,
"String field exceeded string size limit");
}
if (trans_.getBytesRemainingInBuffer() >= size) {
try {
String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, "UTF-8");
trans_.consumeBuffer(size);
return s;
} catch (UnsupportedEncodingException e) {
throw new TException("JVM DOES NOT SUPPORT UTF-8");
}
}
return readStringBody(size);
}
public String readStringBody(int size) throws TException {
try {
byte[] buf = new byte[size];
trans_.readAll(buf, 0, size);
return new String(buf, "UTF-8");
} catch (UnsupportedEncodingException uex) {
throw new TException("JVM DOES NOT SUPPORT UTF-8");
}
}
public void writeBinary(ByteBuffer bin) throws TException {
int length = bin.limit() - bin.position();
writeI32(length);
trans_.write(bin.array(), bin.position() + bin.arrayOffset(), length);
}
public ByteBuffer readBinary() throws TException {
int size = readI32();
if (stringLengthLimit_ > 0 && size > stringLengthLimit_) {
throw new TProtocolException(TProtocolException.SIZE_LIMIT,
"Binary field exceeded string size limit");
}
if (trans_.getBytesRemainingInBuffer() >= size) {
ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), size);
trans_.consumeBuffer(size);
return bb;
}
byte[] buf = new byte[size];
trans_.readAll(buf, 0, size);
return ByteBuffer.wrap(buf);
}
//
private void checkStringReadLength(int length) throws TProtocolException {
if (length < 0) {
throw new TProtocolException(TProtocolException.NEGATIVE_SIZE, "Negative length: " + length);
}
if (stringLengthLimit_ != NO_LENGTH_LIMIT && length > stringLengthLimit_) {
throw new TProtocolException(TProtocolException.SIZE_LIMIT, "Length exceeded max allowed: " + length);
}
}
//
private void checkContainerReadLength(int length) throws TProtocolException {
if (length < 0) {
throw new TProtocolException(TProtocolException.NEGATIVE_SIZE, "Negative length: " + length);
}
if (containerLengthLimit_ != NO_LENGTH_LIMIT && length > containerLengthLimit_) {
throw new TProtocolException(TProtocolException.SIZE_LIMIT, "Length exceeded max allowed: " + length);
}
}
private int readAll(byte[] buf, int off, int len) throws TException {
return trans_.readAll(buf, off, len);
}
}
所谓的严格的读/写,就是是否读取和写入版本号。
针对 thrift 0.9.3
版本号 VERSION_1 为 0x80010000(10000000000000010000000000000000)
其实版本号只用了两个字节标识,剩下的最后一个字节表示消息类型 message.type[ CALL(1), REPLY(2), EXCEPTION(3), ONEWAY(4)]。
故写入的是(VERSION_1 | message.type)。
读取的时候将其与 11111111111111110000000000000000 做与操作就得到了真正的版本号。与 0x000000ff 做与操作得到了 message.type。
消息头最开始的 1 标志了是新/旧版本的协议。
而计算机中整数是使用补码存在的,故可以通过 readI32 判断新旧编码。
测试
public class T1 {
@Test
public void write() throws IOException, TException {
Pair pair = new Pair();
pair.setKey("name").setValue("lgh");
FileOutputStream fos = new FileOutputStream(new File("pair.txt"));
pair.write(new TBinaryProtocol(new TIOStreamTransport(fos)));
fos.close();
}
@Test
public void read() throws TException, IOException {
Pair pair = new Pair();
FileInputStream fis = new FileInputStream(new File("pair.txt"));
pair.read(new TBinaryProtocol(new TIOStreamTransport(fis)));
System.out.println("key => " + pair.getKey());
System.out.println("value => " + pair.getValue());
fis.close();
}
}
namespace java com.meituan.model
struct Pair {
1: required string key
2: required string value
}
// pair.write
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
// Pair.PairStandardScheme#write
public void write(org.apache.thrift.protocol.TProtocol oprot, Pair struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC); // TBinaryProtocol#writeStructBegin 无操作
if (struct.key != null) {
oprot.writeFieldBegin(KEY_FIELD_DESC); // 写1个字节参数类型,2个字节参数编号,如 key 为1,value 为2
oprot.writeString(struct.key); // 写4字节 Pair.key 对应的长度,和其对应的 UTF-8 编码字节序列
oprot.writeFieldEnd(); // TBinaryProtocol#writeFieldEnd 无操作
}
if (struct.value != null) {
oprot.writeFieldBegin(VALUE_FIELD_DESC);
oprot.writeString(struct.value);
oprot.writeFieldEnd();
}
oprot.writeFieldStop(); // 写入1字节0
oprot.writeStructEnd(); // TBinaryProtocol#writeStructEnd 无操作
}
参数类型对应的值
public final class TType {
public static final byte STOP = 0; // 数据读写完成
public static final byte VOID = 1; // 空值
public static final byte BOOL = 2; // 布尔值
public static final byte BYTE = 3; // 字节
public static final byte DOUBLE = 4; // 双精度浮点型
public static final byte I16 = 6; // 短整型
public static final byte I32 = 8; // 整型
public static final byte I64 = 10; // 长整型
public static final byte STRING = 11; // 字符串类型
public static final byte STRUCT = 12; // 引用类型
public static final byte MAP = 13; // Map
public static final byte SET = 14; // 集合
public static final byte LIST = 15; // 列表
public static final byte ENUM = 16; // 枚举
}
数据的写入规则:
→ hexdump -C pair.txt
00000000 0b 00 01 00 00 00 04 6e 61 6d 65 0b 00 02 00 00 |.......name.....|
00000010 00 03 6c 67 68 00 |..lgh.|
00000016
可以看到对于参数名称,thrift 是无感知的。
参考:
https://juejin.im/post/5b290e58518825748c1c6bfc
https://blog.csdn.net/ITer_ZC/article/details/39497863
https://andrewpqc.github.io/2019/02/24/thrift/
上一篇: 用PHP读取超大文件的实例代码_PHP
下一篇: 在羊驼中整合友言评论、留言板