flink数据类型和序列化
程序员文章站
2022-07-13 14:56:49
...
数据类型总览
总体继承关系如下
Flink的数据类型
这里我们取一种类型分析它的继承关系
序列化过程
具体的序列化过程时怎么样的呢?通过如图 TypeInfomation的createSerializer方法:
/**
* Creates a serializer for the type. The serializer may use the ExecutionConfig
* for parameterization.
*根据类型创建序列化器,这个序列化器需要ExecutionConfig对象作为参数,ExecutionConfig是执行task级别的配置信息,说明序列化是不同机器的task之间进行数据交互的形式。
* @param config The config used to parameterize the serializer.
* @return A serializer for this type.
*/
@PublicEvolving
public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
由于不同的类型对TypeInfomation这个基类的继承,会重写这个createSerializer方法,所以对于具体类型的序列化过程需要参照具体的createSerializer方法,这里截取PojoTypeInfo的createSerializer来分析:
@Override
@PublicEvolving
@SuppressWarnings("unchecked")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
//判断是否开启强制Kryo序列化,可以通过env.getConfig().enableForceKryo()开启
//或者通过env.getConfig().disableGenericTypes();禁用kryo序列化,(GenericTypeInfo是由Kyro进行序列化的)
//还可以通过env.getConfig().disableForceKryo();禁用强制kryo序列化
if (config.isForceKryoEnabled()) {
return new KryoSerializer<>(getTypeClass(), config);
}
//判断是否开启强制Avro序列化,可以通过env.getConfig().enableForceAvro()开启
//还可以通过env.getConfig().disableForceAvro();禁用强制Avro序列化
if (config.isForceAvroEnabled()) {
return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
}
//调用createPojoSerializer方法
return createPojoSerializer(config);
}
//非kryo和Avro序列化的方法
public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
//创建TypeSerializer数组,数组长度为对应POJO属性个数
//成员变量的定义 private final PojoField[] fields;
//构造方法中有 this.fields = fields.toArray(new PojoField[fields.size()]);
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
//创建Field[]
Field[] reflectiveFields = new Field[fields.length];
//通过反射获取对应的序列化类型,所以要求POJO对应的Bean
//1、必须是public以及不含非静态内部类
//2、必须有一个public类型的无参构造方法
//3、所有的非静态,非transient字段必须是public的或者提供public的setter和getter方法
for (int i = 0; i < fields.length; i++) {
fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
reflectiveFields[i] = fields[i].getField();
}
return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}
以下是PojoSerializer的构造方法
/**
* Constructor to create a new {@link PojoSerializer}.
*/
@SuppressWarnings("unchecked")
public PojoSerializer(
Class<T> clazz,
TypeSerializer<?>[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
//类不能为空
this.clazz = checkNotNull(clazz);
//对应变量的序列化
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
//变量名非空检查
this.fields = checkNotNull(fields);
//变量数
this.numFields = fieldSerializers.length;
//executionConfig非空检查
this.executionConfig = checkNotNull(executionConfig);
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true);
}
this.cl = Thread.currentThread().getContextClassLoader();
// We only want those classes that are not our own class and are actually sub-classes.
LinkedHashSet<Class<?>> registeredSubclasses =
getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig);
this.registeredClasses = createRegisteredSubclassTags(registeredSubclasses);
this.registeredSerializers = createRegisteredSubclassSerializers(registeredSubclasses, executionConfig);
this.subclassSerializerCache = new HashMap<>();
}
序列化图示如下
上一篇: Basler相机实时图像显示--Qt代码
下一篇: 基于窗口的实时统计