欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark[二]:org.apache.spark.SparkException: Task not serializable

程序员文章站 2022-07-15 12:58:18
...

Spark[二]:org.apache.spark.SparkException: Task not serializable


在开发Spark-Etl的过程中,遇到了”Task not serializable”

17/08/23 10:51:07 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:333)
	at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.flatMap(RDD.scala:332)
    at com.test.scala.analyser.ETLAnalyzer.doAnalyse(ETLAnalyzer.scala:48)
    at com.test.etl.job.scala.job.ETLOTTJob$.main(ETLOTTJob.scala:29)
    at com.test.etl.job.scala.job.ETLOTTJob.main(ETLOTTJob.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:558)

经查证是因为在spark分布式算子中(map, filter, flatMap 等) 使用了外部的变量,但是这个变量不能序列化。具体程序实现如下:

# 初始化 sparkContext
val sparkContext = initSparkContext(this.getClass, args)

# 执行Map算子,问题就出在:将sparkContext放在了map算子中
sparkContext.textFile(inputPath).flatMap(x => {
    Log.readFromString(x,sparkContext)
}

深入SparkContext的源代码得知,sparkContext继承了Logging接口,并实现了ExecutorAllocationClient接口,这两个接口都没有实现Serializable接口,所以就会出问题 ‘Task not serializable’。
知道问题了就好解决了,我的程序中把sparkContext参数去掉即可.即:

sparkContext.textFile(inputPath).flatMap(x => {
    Log.readFromString(x)
}

如果是某一个实体中需要引用sparkContext,这时,只需要在sparkContext变量前,加 “@transent” 即可:

class TestSeriable(_argsMap: Map[String, String]) extends Serializable {
    @transient var sc: SparkContext = null
    @transient var sqlContext: SQLContext = null
    @transient var hdfs: FileSystem = null
}
相关标签: spark