TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields
程序员文章站
2022-05-26 17:55:45
...
TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType.
项目场景:flink-1.12.0版本
提示:这里简述项目相关背景:转换算子测试
问题描述:
错误提示:TimestampedFileInputSplit cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType.
Please read the Flink documentation on “Data Types & Serialization” for details of the effect on performance.
TimestampedFileInputSplit does not contain a setter for field modificationTime
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> stringDataStreamSource = env.readTextFile("sensor.txt");
DataStream<SensorReading> mapDataStream = stringDataStreamSource.map(line -> {
String[] fields = line.split(",");
if (fields.length>0){
return new SensorReading(fields[0],
new Long(fields[1]),
new Double(fields[2]));
}
return new SensorReading();
});
KeyedStream<SensorReading, String> keyedStream = mapDataStream.keyBy(sensorReading -> sensorReading.getId());
DataStream<SensorReading> maxBy = keyedStream.maxBy("temperature");
maxBy.print("sensor");
env.execute("sensor");
}
</font>
<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">
# 原因分析:
<font color=#999AAA ><dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency></font>
<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">
# 解决方案:
去除了flink-clients的maven依赖 <scope>provided</scope>测试成功
下一篇: java全角半角标点符号转换