在java代码中运行spark任务报异常org.apache.spark.SparkException: Task not serializable
程序员文章站
2022-07-15 12:54:54
...
运行如下java程序代码会报未序列化的异常
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.junit.Test;
import scala.Serializable;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
/**
* Created by Feng
*/
public class SparkTest{
@Test
public void testFileOp(){
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> input = sc.textFile("C:\\Users\\76348\\Desktop\\民国之文豪崛起.txt");
JavaRDD<String> words = input.flatMap(
new FlatMapFunction<String, String>() {
public Iterator<String> call(String x) {
return Arrays.asList(x.split("")).iterator();
}});
JavaPairRDD<String, Integer> counts = words.mapToPair(
new PairFunction<String, String, Integer>(){
public Tuple2<String, Integer> call(String x){
return new Tuple2(x, 1);
}}).reduceByKey(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer x, Integer y){ return x + y;}});
Map map = counts.collectAsMap();
counts.saveAsTextFile("C:\\Users\\76348\\Desktop\\test");
}
}
但奇怪的是spark依赖定义的FlatMapFunction
和PairFunction
都是实现了Serializable
接口的,仔细看报错信息,找到如下关键字
Serialization stack:
- object not serializable (class: SparkTest, value: [email protected])
- field (class: SparkTest$1, name: this$0, type: class SparkTest)
- object (class SparkTest$1, [email protected])
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface org.apache.spark.api.java.JavaRDDLike,
竟然是SparkTest
无法序列化,原因是java中的非静态内部类都会隐式的持有外部类的一个隐式引用,序列化内部类时当然也会序列化这个引用,解决方法除了在外部类继承Serializable
接口或在外部定义任务函数之外,还可以使用lambda
表达式来替换匿名内部类