欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Scala常见问题解决

程序员文章站 2022-03-04 14:01:45
...

这俩天总结了在写Spark Job的时候遇到的一些问题,写在这里,以后遇到了方便查看。

1.Error:(64, 64) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

问题的源码:

...
val originRdd: RDD[(String, String, Int)] = dataFrame.map(row => (row.getString(0), row.getString(1), row.getInt(2))).rdd
...

这里是对DataFrame的三列值进行一个特定数据类型的获取动作,但是报错了,这是因为spark数据集在存储数据类型的时候需要编码器,对于常见的数据类型,spark已经有现成的编码器可用,但是需要从spark.implicits._中引入才可以运行。

解决方案:在执行DataFrame转换为RDD之前引入sparkSession.implicits._即可:

import sparkSession.implicits._
val originRdd: RDD[(String, String, Int)] = dataFrame.map(row => (row.getString(0), row.getString(1), row.getInt(2))).rdd

参考链接: https://*.com/questions/38664972/why-is-unable-to-find-encoder-for-type-stored-in-a-dataset-when-creating-a-dat

2.Error:(21, 19) missing parameter type lists.forEach(no => {})

有可能是在遍历java.util.List对象的时候出错,这个时候需要引入一个转换器:

import scala.collection.JavaConversions._

其他的问题可以参考:

https://*.com/questions/13115053/scala-anonymous-function-missing-parameter-type-error

3.Scala测试用例报错java.lang.Exception: Test class should have exactly one public constructor

java.lang.Exception: Test class should have exactly one public constructor

	at org.junit.runners.BlockJUnit4ClassRunner.validateOnlyOneConstructor(BlockJUnit4ClassRunner.java:158)
	at org.junit.runners.BlockJUnit4ClassRunner.validateConstructor(BlockJUnit4ClassRunner.java:147)
	at org.junit.runners.BlockJUnit4ClassRunner.collectInitializationErrors(BlockJUnit4ClassRunner.java:127)
	at org.junit.runners.ParentRunner.validate(ParentRunner.java:416)
	at org.junit.runners.ParentRunner.<init>(ParentRunner.java:84)
	at org.junit.runners.BlockJUnit4ClassRunner.<init>(BlockJUnit4ClassRunner.java:65)
	at org.junit.internal.builders.JUnit4Builder.runnerForClass(JUnit4Builder.java:10)
	at org.junit.runners.model.RunnerBuilder.safeRunnerForClass(RunnerBuilder.java:59)
	at org.junit.internal.builders.AllDefaultPossibilitiesBuilder.runnerForClass(AllDefaultPossibilitiesBuilder.java:26)
	at org.junit.runners.model.RunnerBuilder.safeRunnerForClass(RunnerBuilder.java:59)
	at org.junit.internal.requests.ClassRequest.getRunner(ClassRequest.java:33)
	at org.junit.internal.requests.FilterRequest.getRunner(FilterRequest.java:36)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:49)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.sJUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

解决办法:将object 改为class

4.org.apache.spark.SparkException: Task not serializable错误

Caused by: org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:758)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:757)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:757)
	at com.test.Test2$.main(Test2.scala:70)

在进行RDD的处理时,报了这个错,这是因为Spark在进行分布式计算的时候,需要从Driver节点往Worker节点传输数据内容,比如一个公用的Object,在传输过程中就会使用到序列化(实现Serializable接口),如果传输的对象不能序列化,就会导致Worker不能正常接收到该数据,所以就会报错。

所以出现了这个错误,首先看看rdd的action里面有没有使用到外部的对象,一定要使用可序列化的对象。

val groupedRdd: RDD[(String, Iterable[(String, String, Long)])] = originRdd.keyBy(row => row._1).groupByKey
groupedRdd.mapValues(values => { //这里的values是一个Iterable对象,非序列化对象
    //action内容
}

比如我这里是因为使用了一个Iterable对象,不能序列化导致了错误。

所以我改为了

groupedRdd.mapValues(iter => iter.toList) // 将Iterable转换为List,List是可序列化的,具体可看源码scala.collection.immutable.List
mapValues(values => { //这里的values是一个Iterable对象,非序列化对象
    //action内容
}
相关标签: Scala