欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

vert实践五——Json?Protocol Buffer?FlatBuffers?

程序员文章站 2022-04-01 10:54:25
本节我们来比较一下Json、FlatBuffers、Protocol Buffer在vertx中使用时的占用资源情况比较。方式阐述启动一个消费端进程和一个生产端进程,其中生产端发布一个web服务,方便使用ab测试工具进行测试(其中消费端和生产端分布在两台机器)。ab调用生产端web -> 生产端通过bus传输消息 -> 消费端通过bus回复消息Json序列化生产端 JsonObject jsonObject = new JsonObject(); fo....

本节我们来比较一下Json、FlatBuffers、Protocol Buffer在vertx中使用时的占用资源情况比较。

方式阐述

启动一个消费端进程和一个生产端进程,其中生产端发布一个web服务,方便使用ab测试工具进行测试(其中消费端和生产端分布在两台机器)。

ab调用生产端web -> 生产端通过bus传输消息 -> 消费端通过bus回复消息

消费端机器规格 4核 2.6G HZ 16G
生产端机器规格 6核 3.2G HZ 16G

Json序列化

生产端

   JsonObject jsonObject = new JsonObject();
            for (int i=0; i<500; i ++){
                jsonObject.put("name_" + i,"wan_ke");
            }
            //通过eventbus发送请求
            eventBus.request("com.xiaoniu.bus", jsonObject, msg -> {
                        if (msg.succeeded()) {
                            if (msg.result() != null){
                                String data = ((JsonObject) msg.result().body()).encodePrettily();
                            }
                        } else {
                            System.err.println(msg.cause().getMessage());
                            msg.cause().printStackTrace();
                        }
                    }
            );

消费端

eventBus.consumer("com.xiaoniu.bus", msg -> {
                    System.out.println("收到消息");
                    if (msg != null && msg.body() instanceof JsonObject){
                String receive_data = ((JsonObject) msg.body()).encodePrettily();
            }
            JsonObject jsonObject = new JsonObject();
            for (int i=0;i<500;i++){
                jsonObject.put("name_"+ i,"wan_ke_receive");
            }

           msg.reply(jsonObject);
        });

生产端和消费端均启动后,进行三次,每次间隔2分钟。

第一次执行ab -n10000 -c10 http://192.168.2.112:7777/index(并发度10,1万次请求)

第二次执行ab -n20000 -c20 http://192.168.2.112:7777/index(并发度10,2万次请求)

第三次执行ab -n30000 -c20 http://192.168.2.112:7777/index(并发度20,3万次请求)

资源 cpu 内存
第一次 生产端(初值:12% 峰值:71% 末值:12%)消费端(初值:2% 峰值:16% 末值:4%) 生产端(初值:50M 峰值:294M 末值:131M )消费端(初值:50M 峰值:150M 末值:150M)
第二次 生产端(峰值:62% 末值:12%)消费端(峰值:10% 末值:8%) 生产端(峰值:188M 末值:160M)消费端(峰值:120M 末值:120M)
第三次 生产端(峰值:60% 末值:16%)消费端(峰值:10% 末值:3%) 生产端(峰值:380M 末值:100M )消费端(峰值:230M 末值:110M)

Proto序列化

生产端

 ProtoCommonMsg protoCommonMsg = new ProtoCommonMsg();
      for (int i =0; i<500 ; i ++){
        protoCommonMsg.put("name_" + i,"wan_ke");
      }
      //通过eventbus发送请求
      eventBus.request("com.xiaoniu.bus", protoCommonMsg,options, msg -> {
        if (msg.succeeded()) {
          if (msg.result() != null){
            ProtoCommonMsg proto = (ProtoCommonMsg) msg.result().body();
          }
        } else {
          System.err.println(msg.cause().getMessage());
          msg.cause().printStackTrace();
        }
      }

消费端

   eventBus.consumer("com.xiaoniu.bus", msg -> {
      System.out.println("收到消息");
      if (msg != null && msg.body() instanceof ProtoCommonMsg){
          ProtoCommonMsg receive_data = (ProtoCommonMsg) msg.body();
      }
      ProtoCommonMsg protoCommonMsg = new ProtoCommonMsg();
      for (int i =0; i<500 ; i ++){
        protoCommonMsg.put("name_" + i,"wan_ke_receive");
      }

   msg.reply(protoCommonMsg,options);
    });
资源 cpu 内存
第一次 生产端(初值:12% 峰值:45% 末值:14%)消费端(初值:2% 峰值:9% 末值:3%) 生产端(初值:50M 峰值:165M 末值:37M )消费端(初值:50M 峰值:46M 末值:58M)
第二次 生产端(峰值:53% 末值:11%)消费端(峰值:13% 末值:3%) 生产端(峰值:192M 末值:150M)消费端(峰值:70M 末值:48M)
第三次 生产端(峰值:65% 末值:14%)消费端(峰值:30% 末值:3%) 生产端(峰值:320M 末值:240M )消费端(峰值:70M 末值:50M)

FlatBuffers序列化

生产端

     FlatBuffersCommonMsg flatBuffersCommonMsg = new FlatBuffersCommonMsg();
            for (int i =0; i<500 ; i ++){
                flatBuffersCommonMsg.put("name_" + i,"wan_ke");
            }

            //通过eventbus发送请求
            eventBus.request("com.xiaoniu.bus", flatBuffersCommonMsg,options, msg -> {
                        if (msg.succeeded()) {
                            if (msg.result() != null){
                                FlatBuffersCommonMsg proto = (FlatBuffersCommonMsg) msg.result().body();
                            }
                        } else {
                            System.err.println(msg.cause().getMessage());
                            msg.cause().printStackTrace();
                        }
                    }
            );

消费端

   eventBus.consumer("com.xiaoniu.bus", msg -> {
            System.out.println("收到消息");
            if (msg != null && msg.body() instanceof FlatBuffersCommonMsg){
                FlatBuffersCommonMsg receive_data = (FlatBuffersCommonMsg) msg.body();
            }
            FlatBuffersCommonMsg protoCommonMsg = new FlatBuffersCommonMsg();
            for (int i =0; i<500 ; i ++){
                protoCommonMsg.put("name_" + i,"wan_ke_receive");
            }

            msg.reply(protoCommonMsg,options);
        });
资源 cpu 内存
第一次 生产端(初值:14% 峰值:71% 末值:17%)消费端(初值:3% 峰值:10% 末值:10%) 生产端(初值:50M 峰值:123M 末值:50M )消费端(初值:50M 峰值:70M 末值:70M)
第二次 生产端(峰值:60% 末值:17%)消费端(峰值:8% 末值:2%) 生产端(峰值:300M 末值:300M)消费端(峰值:70M 末值:60M)
第三次 生产端(峰值:66% 末值:17%)消费端(峰值:16% 末值:3%) 生产端(峰值:332M 末值:200M )消费端(峰值:60M 末值:58M)

总结

总体来看Proto和FlatBuffers在消费端的内存消耗表现比较优秀一点

Event Bus如何使用proto编码

自定义编码器

package org.example.MessageCode;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import org.example.proto.ProtoCommonMsg;
import org.example.util.ProtostuffUtils;

/**
 * 自定义对象编解码器,两个类型可用于消息转换,即发送对象转换为接受需要的对象
 */
public class ProtoMessageCodec implements MessageCodec<ProtoCommonMsg, ProtoCommonMsg> {
    /**
     * 将消息实体封装到Buffer用于传输
     * 实现方式:使用对象流从对象中获取Byte数组然后追加到Buffer
     */
    @Override
    public void encodeToWire(Buffer buffer, ProtoCommonMsg protoCommonMsg) {
        buffer.appendBytes(ProtostuffUtils.serialize(protoCommonMsg));


    }
    //从Buffer中获取消息对象
    @Override
    public ProtoCommonMsg decodeFromWire(int pos, Buffer buffer) {
       return ProtostuffUtils.deserialize(buffer.getBytes(pos, buffer.length()),ProtoCommonMsg.class);
    }

    @Override
    public ProtoCommonMsg transform(ProtoCommonMsg protoMessage) {
        return null;
    }
    //消息转换

    @Override
    public String name() { return "myProtoCodec"; }
    //识别是否是用户自定义编解码器,通常为-1
    @Override
    public byte systemCodecID() { return -1; }

}

集群EventBus注册且只能注册一次编码解码器

  ProtoMessageCodec protoMessageCodec = new ProtoMessageCodec();
    EventBus eventBus = vertx.eventBus();
    eventBus.registerCodec(protoMessageCodec);

消费者回复需要带上编码解码器名称,告诉生产者这次使用哪个编码解码器

//新建带上编码解码器名称的options
DeliveryOptions options = new DeliveryOptions().setCodecName(protoMessageCodec.name());
 eventBus.consumer("com.xiaoniu.bus", msg -> {
      System.out.println("收到消息");
      if (msg != null && msg.body() instanceof ProtoCommonMsg){
          ProtoCommonMsg receive_data = (ProtoCommonMsg) msg.body();
      }
      ProtoCommonMsg protoCommonMsg = new ProtoCommonMsg();
      for (int i =0; i<500 ; i ++){
        protoCommonMsg.put("name_" + i,"wan_ke_receive");
      }
 //回复消息时带上options
   msg.reply(protoCommonMsg,options);
    });

生产者发送消息时带上编码解码器名称

 DeliveryOptions options = new DeliveryOptions().setCodecName(protoMessageCodec.name());
   eventBus.request("com.xiaoniu.bus", protoCommonMsg,options, msg -> {
        if (msg.succeeded()) {
          if (msg.result() != null){
            ProtoCommonMsg proto = (ProtoCommonMsg) msg.result().body();
          }
        } else {
          System.err.println(msg.cause().getMessage());
          msg.cause().printStackTrace();
        }
      }
      );

使用时,新建ProtoCommonMsg对象,插入键值对即可

package org.example.proto;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author: Administrator
 * @Description:
 * @Date: 2020/7/22 8:32
 * @Version: 1.0
 */
public class ProtoCommonMsg   {

  private Map data = new HashMap();

  public void put(Object key,Object value){
      data.put(key,value);
  }
  public Object get(Object key){
      return  data.get(key);
  }
  public <T> T get(Object key, Class<T> clazz){
        return  (T) data.get(key);
  }
  public String toString(){
      return data.toString();
  }
}

Protocol Buffer编码解码工具,以供自定义编码解码器使用

package org.example.util;

import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class ProtostuffUtils {
    /**
     * 避免每次序列化都重新申请Buffer空间
     */
    private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    /**
     * 缓存Schema
     */
    private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();

    /**
     * 序列化方法,把指定对象序列化成字节数组
     *
     * @param obj
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
        Class<T> clazz = (Class<T>) obj.getClass();
        Schema<T> schema = getSchema(clazz);
        byte[] data;
        try {
            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } finally {
            buffer.clear();
        }

        return data;
    }

    /**
     * 反序列化方法,将字节数组反序列化成指定Class类型
     *
     * @param data
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T deserialize(byte[] data, Class<T> clazz) {
        Schema<T> schema = getSchema(clazz);
        T obj = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(data, obj, schema);
        return obj;
    }

    @SuppressWarnings("unchecked")
    private static <T> Schema<T> getSchema(Class<T> clazz) {
        Schema<T> schema = (Schema<T>) schemaCache.get(clazz);
        if (Objects.isNull(schema)) {
            //这个schema通过RuntimeSchema进行懒创建并缓存
            //所以可以一直调用RuntimeSchema.getSchema(),这个方法是线程安全的
            schema = RuntimeSchema.getSchema(clazz);
            if (Objects.nonNull(schema)) {
                schemaCache.put(clazz, schema);
            }
        }

        return schema;
    }
}

本文地址:https://blog.csdn.net/xk4848123/article/details/107686232

相关标签: vertx