vert实践五——Json?Protocol Buffer?FlatBuffers?
本节我们来比较一下Json、FlatBuffers、Protocol Buffer在vertx中使用时的占用资源情况比较。
方式阐述
启动一个消费端进程和一个生产端进程,其中生产端发布一个web服务,方便使用ab测试工具进行测试(其中消费端和生产端分布在两台机器)。
ab调用生产端web -> 生产端通过bus传输消息 -> 消费端通过bus回复消息
消费端机器规格 4核 2.6G HZ 16G
生产端机器规格 6核 3.2G HZ 16G
Json序列化
生产端
JsonObject jsonObject = new JsonObject();
for (int i=0; i<500; i ++){
jsonObject.put("name_" + i,"wan_ke");
}
//通过eventbus发送请求
eventBus.request("com.xiaoniu.bus", jsonObject, msg -> {
if (msg.succeeded()) {
if (msg.result() != null){
String data = ((JsonObject) msg.result().body()).encodePrettily();
}
} else {
System.err.println(msg.cause().getMessage());
msg.cause().printStackTrace();
}
}
);
消费端
eventBus.consumer("com.xiaoniu.bus", msg -> {
System.out.println("收到消息");
if (msg != null && msg.body() instanceof JsonObject){
String receive_data = ((JsonObject) msg.body()).encodePrettily();
}
JsonObject jsonObject = new JsonObject();
for (int i=0;i<500;i++){
jsonObject.put("name_"+ i,"wan_ke_receive");
}
msg.reply(jsonObject);
});
生产端和消费端均启动后,进行三次,每次间隔2分钟。
第一次执行ab -n10000 -c10 http://192.168.2.112:7777/index
(并发度10,1万次请求)
第二次执行ab -n20000 -c20 http://192.168.2.112:7777/index
(并发度10,2万次请求)
第三次执行ab -n30000 -c20 http://192.168.2.112:7777/index
(并发度20,3万次请求)
资源 | cpu | 内存 |
---|---|---|
第一次 | 生产端(初值:12% 峰值:71% 末值:12%)消费端(初值:2% 峰值:16% 末值:4%) | 生产端(初值:50M 峰值:294M 末值:131M )消费端(初值:50M 峰值:150M 末值:150M) |
第二次 | 生产端(峰值:62% 末值:12%)消费端(峰值:10% 末值:8%) | 生产端(峰值:188M 末值:160M)消费端(峰值:120M 末值:120M) |
第三次 | 生产端(峰值:60% 末值:16%)消费端(峰值:10% 末值:3%) | 生产端(峰值:380M 末值:100M )消费端(峰值:230M 末值:110M) |
Proto序列化
生产端
ProtoCommonMsg protoCommonMsg = new ProtoCommonMsg();
for (int i =0; i<500 ; i ++){
protoCommonMsg.put("name_" + i,"wan_ke");
}
//通过eventbus发送请求
eventBus.request("com.xiaoniu.bus", protoCommonMsg,options, msg -> {
if (msg.succeeded()) {
if (msg.result() != null){
ProtoCommonMsg proto = (ProtoCommonMsg) msg.result().body();
}
} else {
System.err.println(msg.cause().getMessage());
msg.cause().printStackTrace();
}
}
消费端
eventBus.consumer("com.xiaoniu.bus", msg -> {
System.out.println("收到消息");
if (msg != null && msg.body() instanceof ProtoCommonMsg){
ProtoCommonMsg receive_data = (ProtoCommonMsg) msg.body();
}
ProtoCommonMsg protoCommonMsg = new ProtoCommonMsg();
for (int i =0; i<500 ; i ++){
protoCommonMsg.put("name_" + i,"wan_ke_receive");
}
msg.reply(protoCommonMsg,options);
});
资源 | cpu | 内存 |
---|---|---|
第一次 | 生产端(初值:12% 峰值:45% 末值:14%)消费端(初值:2% 峰值:9% 末值:3%) | 生产端(初值:50M 峰值:165M 末值:37M )消费端(初值:50M 峰值:46M 末值:58M) |
第二次 | 生产端(峰值:53% 末值:11%)消费端(峰值:13% 末值:3%) | 生产端(峰值:192M 末值:150M)消费端(峰值:70M 末值:48M) |
第三次 | 生产端(峰值:65% 末值:14%)消费端(峰值:30% 末值:3%) | 生产端(峰值:320M 末值:240M )消费端(峰值:70M 末值:50M) |
FlatBuffers序列化
生产端
FlatBuffersCommonMsg flatBuffersCommonMsg = new FlatBuffersCommonMsg();
for (int i =0; i<500 ; i ++){
flatBuffersCommonMsg.put("name_" + i,"wan_ke");
}
//通过eventbus发送请求
eventBus.request("com.xiaoniu.bus", flatBuffersCommonMsg,options, msg -> {
if (msg.succeeded()) {
if (msg.result() != null){
FlatBuffersCommonMsg proto = (FlatBuffersCommonMsg) msg.result().body();
}
} else {
System.err.println(msg.cause().getMessage());
msg.cause().printStackTrace();
}
}
);
消费端
eventBus.consumer("com.xiaoniu.bus", msg -> {
System.out.println("收到消息");
if (msg != null && msg.body() instanceof FlatBuffersCommonMsg){
FlatBuffersCommonMsg receive_data = (FlatBuffersCommonMsg) msg.body();
}
FlatBuffersCommonMsg protoCommonMsg = new FlatBuffersCommonMsg();
for (int i =0; i<500 ; i ++){
protoCommonMsg.put("name_" + i,"wan_ke_receive");
}
msg.reply(protoCommonMsg,options);
});
资源 | cpu | 内存 |
---|---|---|
第一次 | 生产端(初值:14% 峰值:71% 末值:17%)消费端(初值:3% 峰值:10% 末值:10%) | 生产端(初值:50M 峰值:123M 末值:50M )消费端(初值:50M 峰值:70M 末值:70M) |
第二次 | 生产端(峰值:60% 末值:17%)消费端(峰值:8% 末值:2%) | 生产端(峰值:300M 末值:300M)消费端(峰值:70M 末值:60M) |
第三次 | 生产端(峰值:66% 末值:17%)消费端(峰值:16% 末值:3%) | 生产端(峰值:332M 末值:200M )消费端(峰值:60M 末值:58M) |
总结
总体来看Proto和FlatBuffers在消费端的内存消耗表现比较优秀一点
Event Bus如何使用proto编码
自定义编码器
package org.example.MessageCode;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import org.example.proto.ProtoCommonMsg;
import org.example.util.ProtostuffUtils;
/**
* 自定义对象编解码器,两个类型可用于消息转换,即发送对象转换为接受需要的对象
*/
public class ProtoMessageCodec implements MessageCodec<ProtoCommonMsg, ProtoCommonMsg> {
/**
* 将消息实体封装到Buffer用于传输
* 实现方式:使用对象流从对象中获取Byte数组然后追加到Buffer
*/
@Override
public void encodeToWire(Buffer buffer, ProtoCommonMsg protoCommonMsg) {
buffer.appendBytes(ProtostuffUtils.serialize(protoCommonMsg));
}
//从Buffer中获取消息对象
@Override
public ProtoCommonMsg decodeFromWire(int pos, Buffer buffer) {
return ProtostuffUtils.deserialize(buffer.getBytes(pos, buffer.length()),ProtoCommonMsg.class);
}
@Override
public ProtoCommonMsg transform(ProtoCommonMsg protoMessage) {
return null;
}
//消息转换
@Override
public String name() { return "myProtoCodec"; }
//识别是否是用户自定义编解码器,通常为-1
@Override
public byte systemCodecID() { return -1; }
}
集群EventBus注册且只能注册一次编码解码器
ProtoMessageCodec protoMessageCodec = new ProtoMessageCodec();
EventBus eventBus = vertx.eventBus();
eventBus.registerCodec(protoMessageCodec);
消费者回复需要带上编码解码器名称,告诉生产者这次使用哪个编码解码器
//新建带上编码解码器名称的options
DeliveryOptions options = new DeliveryOptions().setCodecName(protoMessageCodec.name());
eventBus.consumer("com.xiaoniu.bus", msg -> {
System.out.println("收到消息");
if (msg != null && msg.body() instanceof ProtoCommonMsg){
ProtoCommonMsg receive_data = (ProtoCommonMsg) msg.body();
}
ProtoCommonMsg protoCommonMsg = new ProtoCommonMsg();
for (int i =0; i<500 ; i ++){
protoCommonMsg.put("name_" + i,"wan_ke_receive");
}
//回复消息时带上options
msg.reply(protoCommonMsg,options);
});
生产者发送消息时带上编码解码器名称
DeliveryOptions options = new DeliveryOptions().setCodecName(protoMessageCodec.name());
eventBus.request("com.xiaoniu.bus", protoCommonMsg,options, msg -> {
if (msg.succeeded()) {
if (msg.result() != null){
ProtoCommonMsg proto = (ProtoCommonMsg) msg.result().body();
}
} else {
System.err.println(msg.cause().getMessage());
msg.cause().printStackTrace();
}
}
);
使用时,新建ProtoCommonMsg对象,插入键值对即可
package org.example.proto;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: Administrator
* @Description:
* @Date: 2020/7/22 8:32
* @Version: 1.0
*/
public class ProtoCommonMsg {
private Map data = new HashMap();
public void put(Object key,Object value){
data.put(key,value);
}
public Object get(Object key){
return data.get(key);
}
public <T> T get(Object key, Class<T> clazz){
return (T) data.get(key);
}
public String toString(){
return data.toString();
}
}
Protocol Buffer编码解码工具,以供自定义编码解码器使用
package org.example.util;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ProtostuffUtils {
/**
* 避免每次序列化都重新申请Buffer空间
*/
private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
/**
* 缓存Schema
*/
private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();
/**
* 序列化方法,把指定对象序列化成字节数组
*
* @param obj
* @param <T>
* @return
*/
@SuppressWarnings("unchecked")
public static <T> byte[] serialize(T obj) {
Class<T> clazz = (Class<T>) obj.getClass();
Schema<T> schema = getSchema(clazz);
byte[] data;
try {
data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
buffer.clear();
}
return data;
}
/**
* 反序列化方法,将字节数组反序列化成指定Class类型
*
* @param data
* @param clazz
* @param <T>
* @return
*/
public static <T> T deserialize(byte[] data, Class<T> clazz) {
Schema<T> schema = getSchema(clazz);
T obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
}
@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> clazz) {
Schema<T> schema = (Schema<T>) schemaCache.get(clazz);
if (Objects.isNull(schema)) {
//这个schema通过RuntimeSchema进行懒创建并缓存
//所以可以一直调用RuntimeSchema.getSchema(),这个方法是线程安全的
schema = RuntimeSchema.getSchema(clazz);
if (Objects.nonNull(schema)) {
schemaCache.put(clazz, schema);
}
}
return schema;
}
}
本文地址:https://blog.csdn.net/xk4848123/article/details/107686232
上一篇: Mybatis Plus 10分钟上手
下一篇: java多维数组与排序