欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink数据类型和序列化

程序员文章站 2022-07-13 14:56:49
...

数据类型总览

flink数据类型和序列化

总体继承关系如下
flink数据类型和序列化

Flink的数据类型

这里我们取一种类型分析它的继承关系

flink数据类型和序列化

序列化过程

​ 具体的序列化过程时怎么样的呢?通过如图 TypeInfomation的createSerializer方法:

	/**
	 * Creates a serializer for the type. The serializer may use the ExecutionConfig
	 * for parameterization.
	 *根据类型创建序列化器,这个序列化器需要ExecutionConfig对象作为参数,ExecutionConfig是执行task级别的配置信息,说明序列化是不同机器的task之间进行数据交互的形式。
	 * @param config The config used to parameterize the serializer.
	 * @return A serializer for this type.  
	 */
	@PublicEvolving
	public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);

由于不同的类型对TypeInfomation这个基类的继承,会重写这个createSerializer方法,所以对于具体类型的序列化过程需要参照具体的createSerializer方法,这里截取PojoTypeInfo的createSerializer来分析:

	@Override
	@PublicEvolving
	@SuppressWarnings("unchecked")
	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
    //判断是否开启强制Kryo序列化,可以通过env.getConfig().enableForceKryo()开启
    //或者通过env.getConfig().disableGenericTypes();禁用kryo序列化,(GenericTypeInfo是由Kyro进行序列化的)
    //还可以通过env.getConfig().disableForceKryo();禁用强制kryo序列化
		if (config.isForceKryoEnabled()) {
			return new KryoSerializer<>(getTypeClass(), config);
		}
	//判断是否开启强制Avro序列化,可以通过env.getConfig().enableForceAvro()开启
    //还可以通过env.getConfig().disableForceAvro();禁用强制Avro序列化
		if (config.isForceAvroEnabled()) {
			return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
		}
		//调用createPojoSerializer方法
		return createPojoSerializer(config);
	}
	//非kryo和Avro序列化的方法
	public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
    //创建TypeSerializer数组,数组长度为对应POJO属性个数
    //成员变量的定义 private final PojoField[] fields;
    //构造方法中有 this.fields = fields.toArray(new PojoField[fields.size()]);
		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
    //创建Field[]
		Field[] reflectiveFields = new Field[fields.length];
		//通过反射获取对应的序列化类型,所以要求POJO对应的Bean
    //1、必须是public以及不含非静态内部类
    //2、必须有一个public类型的无参构造方法
		//3、所有的非静态,非transient字段必须是public的或者提供public的setter和getter方法
		for (int i = 0; i < fields.length; i++) {
			fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
			reflectiveFields[i] = fields[i].getField();
		}

		return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
	}

以下是PojoSerializer的构造方法

	/**
	 * Constructor to create a new {@link PojoSerializer}.
	 */
	@SuppressWarnings("unchecked")
	public PojoSerializer(
			Class<T> clazz,
			TypeSerializer<?>[] fieldSerializers,
			Field[] fields,
			ExecutionConfig executionConfig) {
		//类不能为空
		this.clazz = checkNotNull(clazz);
    //对应变量的序列化
		this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
    //变量名非空检查
		this.fields = checkNotNull(fields);
    //变量数
		this.numFields = fieldSerializers.length;
    //executionConfig非空检查
		this.executionConfig = checkNotNull(executionConfig);

		for (int i = 0; i < numFields; i++) {
			this.fields[i].setAccessible(true);
		}

		this.cl = Thread.currentThread().getContextClassLoader();

		// We only want those classes that are not our own class and are actually sub-classes.
		LinkedHashSet<Class<?>> registeredSubclasses =
				getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig);

		this.registeredClasses = createRegisteredSubclassTags(registeredSubclasses);
		this.registeredSerializers = createRegisteredSubclassSerializers(registeredSubclasses, executionConfig);

		this.subclassSerializerCache = new HashMap<>();
	}

序列化图示如下

flink数据类型和序列化