欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

15.RDD 创建

程序员文章站 2022-03-25 17:58:26
...

15课:RDD创建内幕

1.       RDD的创建方式

Spark应用程序运行过程中,第一个RDD代表了Spark应用程序输入数据的来源,之后通过Trasformation来对RDD进行各种算子的转换,来实现具体的算法

Spark中的基本方式:

1)       使用程序中的集合创建

这种方式的实际意义主要用于测试。

2)       使用本地文件系统创建

这种方式的实际意义主要用于测试大量数据的文件

3)       使用HDFS创建RDD

这种方式为生产环境中最常用的创建RDD的方式

4)       基于DB创建

5)       基于NoSQL:例如HBase

6)       基于S3(SC3)创建

7)       基于数据流创建

2.       RDD创建实战

1)       通过集合创建

代码:

object RDDBasedOnCollection {
def main (args: Array[String]) {
val conf = new SparkConf()//create SparkConf
conf.setAppName("RDDBasedOnCollection")//set app name
conf.setMaster("local")//run local
val sc =new SparkContext(conf)
val numbers = 1 to 100  //创建一个Scala集合
val rdd = sc.parallelize(numbers)
val sum =rdd.reduce(_+_)  //1+2=3 3+3=6 6+4=10
println("1+2+...+99+100"+"="+sum)
  }
}

结果:

 

2)       通过本地文件系统创建

代码:

object RDDBasedOnLocalFile {

def main (args: Array[String]) {

val conf = new SparkConf()//create SparkConf

conf.setAppName("RDDBasedOnCollection")//set app name

conf.setMaster("local")//run local

val sc =new SparkContext(conf)

val rdd = sc.textFile("C:/Users/feng/IdeaProjects/WordCount/src/SparkText.txt")

val linesLength=rdd.map(line=>line.length())

val sum = linesLength.reduce(_+_)

println("the total characters of the file"+"="+sum)

  }

}

结果:

 

3)       通过HDFS创建RDD

代码:

 val wordcount = sc.textFile("/library/wordcount/input/licenses").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_).filter(pair=>pair._2>20).collect().foreach(println)

结果:

 

 

 

 

 

关于spark并行度:

1.默认并行度为程序分配到的cpu core的数目

2.可以手动设置并行度,并行度最佳实践

         1. 2-4 partitions for each CPU core

         2.综合考虑cpu内存

 

注:本内容原型来自 IMP 课程笔记

如果技术上有什么疑问,欢迎加我QQ交流: 1106373297 
相关标签: spark rdd 分片