15.RDD 创建
第15课:RDD创建内幕
1. RDD的创建方式
Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体的算法
Spark中的基本方式:
1) 使用程序中的集合创建
这种方式的实际意义主要用于测试。
2) 使用本地文件系统创建
这种方式的实际意义主要用于测试大量数据的文件
3) 使用HDFS创建RDD
这种方式为生产环境中最常用的创建RDD的方式
4) 基于DB创建
5) 基于NoSQL:例如HBase
6) 基于S3(SC3)创建
7) 基于数据流创建
2. RDD创建实战
1) 通过集合创建
代码:
object RDDBasedOnCollection {
def main (args: Array[String]) {
val conf = new SparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
val sc =new SparkContext(conf)
val numbers = 1 to 100 //创建一个Scala集合
val rdd = sc.parallelize(numbers)
val sum =rdd.reduce(_+_) //1+2=3 3+3=6 6+4=10
println("1+2+...+99+100"+"="+sum)
}
}
结果:
2) 通过本地文件系统创建
代码:
object RDDBasedOnLocalFile { def main (args: Array[String]) { val conf = new SparkConf()//create SparkConf conf.setAppName("RDDBasedOnCollection")//set app name conf.setMaster("local")//run local val sc =new SparkContext(conf) val rdd = sc.textFile("C:/Users/feng/IdeaProjects/WordCount/src/SparkText.txt") val linesLength=rdd.map(line=>line.length()) val sum = linesLength.reduce(_+_) println("the total characters of the file"+"="+sum) } }
结果:
3) 通过HDFS创建RDD
代码:
val wordcount = sc.textFile("/library/wordcount/input/licenses").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).filter(pair=>pair._2>20).collect().foreach(println)
结果:
关于spark并行度:
1.默认并行度为程序分配到的cpu core的数目
2.可以手动设置并行度,并行度最佳实践
1. 2-4 partitions for each CPU core
2.综合考虑cpu和内存
注:本内容原型来自 IMP 课程笔记
上一篇: 设计模式 --面试高频之享元模式
下一篇: 序列化类中的自定义字段设置