欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

高性能序列化协议protobuf初探

程序员文章站 2022-06-17 09:07:13
...

在网络传输过程中,数据或对象在发送端进行序列化serialize,转成二进制;然后接受端需要对数据或对象进行反序列化unserialize,变成可用的类型。

首先来看看一般我们是怎么对一个对象进行序列化的。

java序列化

定义一个简单的pojo User

import java.io.Serializable;

/**
 * Created by 蚂蚁的宝藏 on 2018/1/27.
 */
public class User implements Serializable {
    private int id;
    private int age;
    private String userName;
    private String address;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", age=" + age +
                ", userName='" + userName + '\'' +
                ", address='" + address + '\'' +
                '}';
    }
}

用ObjectInputStream 和ObjectOutputStream进行简单的序列化和反序列化

package protobuf.base.serial;

import protobuf.pojo.User;

import java.io.*;
import java.util.Arrays;

/**
 * Created by 蚂蚁的宝藏 on 2018/2/1.
 */
public class SerialUser {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        User user = new User();
        user.setUserName("蚂蚁的宝藏");
        user.setAge(22);
        user.setId(1);
        user.setAddress("陕西");
        byte[] bytes = serial(user);
        System.out.println(Arrays.toString(bytes));
        System.out.println((User)unserial(bytes));
    }

    public static byte[] serial(Object obj) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
        objectOutputStream.writeObject(obj);
        return outputStream.toByteArray();
    }

    public static Object unserial(byte[] bytes) throws IOException, ClassNotFoundException {
        InputStream inputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
        return objectInputStream.readObject();
    }
}

run一下

高性能序列化协议protobuf初探

可以看到这个简单的pojo序列化后的byte内容就这么多,在网络传输过程中,是很占带宽的,传输速度大大降低。接着看看protobuf序列化之后的byte内容,做个对比,就知道多么强悍了。

protobuf 序列化

一般来说,使用protobuf前需要写.proto文件,然后用proto命令生成对应的java文件。
那我们先来编写一个User.proto文件

option java_outer_classname = "UserProto";

message User{
    required int32 id = 1;
    required int32 age = 2;
    required string username = 3;
    required string address = 4;
}

required表示这个变量必须赋值,后面的就是protobuf的变量类型,和java类型稍有区别。后面的1,2,3,4是编解码顺序,这也是protobuf高效的一个重要原因,后面介绍。
关于具体语法,可以参阅官方文档。

然后在cmd下
高性能序列化协议protobuf初探

回车,会发现

高性能序列化协议protobuf初探

protobuf帮我们生成了一个接近900行代码的一个UserProto类,后面分析类里面的代码,先测试一下protobuf的序列化效果。

package protobuf.proto;

import com.google.protobuf.InvalidProtocolBufferException;

import java.util.Arrays;

/**
 * Created by 蚂蚁的宝藏 on 2018/2/1.
 */
public class TestProtobuf {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        byte[] bytes = serialize();
        System.out.println(Arrays.toString(bytes));
        UserProto.User user = disSerialize(bytes);
        System.out.println(user.getId());
        System.out.println(user.getAge());
        System.out.println(user.getAddress());
        System.out.println(user.getUsername());
    }

    public static byte[] serialize(){
        UserProto.User.Builder builder = UserProto.User.newBuilder();
        builder.setId(1);
        builder.setUsername("蚂蚁的宝藏");
        builder.setAge(22);
        builder.setAddress("陕西");
        UserProto.User user = builder.build();
        return user.toByteArray();
    }

    public static UserProto.User disSerialize(byte[] bytes) throws InvalidProtocolBufferException {
        return UserProto.User.parseFrom(bytes);
    }
}

run一下
高性能序列化协议protobuf初探
是不是想喊一句卧槽!
来 对比一下两个byte文件,直观的感受一下。

jdk_serial protobuf_serial
[-84, -19, 0, 5, 115, 114, 0, 18, 112, 114, 111, 116, 111, 98, 117, 102, 46, 112, 111, 106, 111, 46, 85, 115, 101, 114, 99, -67, -24, -113, -112, 94, -22, 127, 2, 0, 4, 73, 0, 3, 97, 103, 101, 73, 0, 2, 105, 100, 76, 0, 7, 97, 100, 100, 114, 101, 115, 115, 116, 0, 18, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 59, 76, 0, 8, 117, 115, 101, 114, 78, 97, 109, 101, 113, 0, 126, 0, 1, 120, 112, 0, 0, 0, 22, 0, 0, 0, 1, 116, 0, 6, -23, -103, -107, -24, -91, -65, 116, 0, 15, -24, -102, -126, -24, -102, -127, -25, -102, -124, -27, -82, -99, -24, -105, -113] [8, 1, 16, 22, 26, 15, -24, -102, -126, -24, -102, -127, -25, -102, -124, -27, -82, -99, -24, -105, -113, 34, 6, -23, -103, -107, -24, -91, -65]

就这么一个简单的pojo,带宽就少了7倍之多!
那么为什么protobuf序列化后的码流这么少呢?

代码分析

jdk进行序列化的时候要记录对象所在的类的信息,比如元数据,数据类型等等,这样反序列化才能进行映射。而protobuf会根据proto文件中字段的tag顺序,用

public void writeTo(com.google.protobuf.CodedOutputStream output)
                        throws java.io.IOException {
      getSerializedSize();
      if (((bitField0_ & 0x00000001) == 0x00000001)) {
        output.writeInt32(1, id_);
      }
      if (((bitField0_ & 0x00000002) == 0x00000002)) {
        output.writeInt32(2, age_);
      }
      if (((bitField0_ & 0x00000004) == 0x00000004)) {
        output.writeBytes(3, getUsernameBytes());
      }
      if (((bitField0_ & 0x00000008) == 0x00000008)) {
        output.writeBytes(4, getAddressBytes());
      }
      getUnknownFields().writeTo(output);
    }

可以看到,protobuf会安装我们定义的顺序来一个一个写入到OutPutStream中。看看int类型怎么写的

        public final void writeInt32(int fieldNumber, int value) throws IOException {
            this.writeTag(fieldNumber, 0);
            this.writeInt32NoTag(value);
        }
        public final void writeInt32NoTag(int value) throws IOException {
            if(value >= 0) {
                this.writeUInt32NoTag(value);
            } else {
                this.writeUInt64NoTag((long)value);
            }
        }
        public void writeUInt32NoTag(int value) throws IOException {
            if(this.position > this.oneVarintLimit) {
                while(this.position < this.limit) {
                    if((value & -128) == 0) {
                        UnsafeUtil.putByte((long)(this.position++), (byte)value);
                        return;
                    }

                    UnsafeUtil.putByte((long)(this.position++), (byte)(value & 127 | 128));
                    value >>>= 7;
                }

                throw new CodedOutputStream.OutOfSpaceException(String.format("Pos: %d, limit: %d, len: %d", new Object[]{Long.valueOf(this.position), Long.valueOf(this.limit), Integer.valueOf(1)}));
            } else {
                while((value & -128) != 0) {
                    UnsafeUtil.putByte((long)(this.position++), (byte)(value & 127 | 128));
                    value >>>= 7;
                }

                UnsafeUtil.putByte((long)(this.position++), (byte)value);
            }
        }

如果是个正数,直接采用可变长编码,负数,会用64位可变长编码。
关于position,oneVarintLimit,limit this.initialPosition = this.address + (long)buffer.position();this.limit = this.address + (long)buffer.limit();this.oneVarintLimit = this.limit - 10L;this.position = this.initialPosition;
CodeOutputStream是这么定义的,如果我们传进来的值是22的话,position=0;limit为1,oneVarintLimit就是1-10=-9;

if(0>-9)成立,进入while循环,-128的二进制就是128的源码取反加1,即10000000,所以value&-128,就是在取value第一个字节的最高位,如果为0,说明value是小于128的,用一个字节可以存储,直接putByte返回。

否则,127二进制为01111111,value&127就是取得一个字节的1-7位,128二进制为10000000,再和128相或,就是把第八位设1,然后装进该position,position++,value值右移7位。继续循环。

可以看到在protobuf中真正存储数据的是1-7位,第8位是个判断位,如果第8位为1,表示后面还有数据未处理,当前字节不能完全存储数据。那么如果4个字节只有28位数据,所以会用1~5个字节来存储int类型数据。long类型或者负数会用1-9个字节,所以对于负数来说,protobuf的编码效率并不高。

netty搭配protobuf

那么netty作为一个卓越的通信框架,自然对protobuf做了友好集成,使用也非常方便,只需要在channelPipeLine中添加我们的protobufDeCoder和protobufEnCoder就行。

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("1",new ProtobufVarint32FrameDecoder());
                pipeline.addLast("2",new ProtobufDecoder(UserProto.User.getDefaultInstance()));
                pipeline.addLast("3",new ProtobufVarint32LengthFieldPrepender());
                pipeline.addLast("4",new ProtobufEncoder());
                pipeline.addLast("5",new MyServerMessageHandler());
                return pipeline;
            }
        });

版权声明:本文由蚂蚁的宝藏创作和发表,采用署名(BY)-非商业性使用(NC)-相同方式共享(SA)国际许可协议进行许可,转载请注明作者及出处,本文作者为蚂蚁的宝藏,本文标题为Netty线程模型及源码浅析,本文链接为http://mayibz.me/2018/01/28/Netty线程模型及源码浅析/.