欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RDD转换算子、行动算子

程序员文章站 2024-03-22 19:00:22
...

RDD转换算子
对于转换操作,RDD的所有转换都不会直接计算结果
仅记录作用于RDD上的操作
当遇到动作算子(Action)时才会进行真正计算
RDD转换算子、行动算子

RDD常用的转换算子
map算子
对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD
任何原RDD中的元素在新RDD中都有且只有一个元素与之对应
输入分区与输出分区一一对应
//将原RDD中每个元素都乘以2来产生一个新的RDD

val a = sc.parallelize(1 to 9)
val b = a.map(x => x * 2)
a.collect().foreach(println)
b.collect().foreach(println)

RDD转换算子、行动算子

filter算子
对元素进行过滤,对每个元素应用指定函数,返回值为true的元素保留在新的RDD中

val a =sc.parallelize(1 to 10)
val b = a.filter(_ % 2 == 0).collect
val c = a.filter(_ <  4).collect
a.foreach(println)
b.foreach(println)

RDD转换算子、行动算子

mapValues算子
原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素,仅适用于PairRDD

val a = sc.parallelize(List("Tiger","Cat","Eagle","Lion","Panther"))
val b = a.map(x => (x.length,x))
a.collect().foreach(println)
val c = b.mapValues(x => "_" + x + "_")
c.collect().foreach(println)

RDD转换算子、行动算子

distinct算子
用于去除重复数据

 	val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 2, 6))
    val rdd2 = rdd1.distinct()
    println("rdd1分区数:" + rdd1.partitions.length)
    println("rdd2分区数:" + rdd2.partitions.length)
    val rdd3 = rdd1.distinct(2)
    println("rdd3分区数:" + rdd3.partitions.length)
    rdd2.collect().foreach(println)

RDD转换算子、行动算子

reduceByKey算子
接受一个函数,按照相同的key进行reduce操作,类似于scala的reduce操作

val rdd = sc.parallelize(List("Tiger", "Cat", "Lion", "Eagle", "Panther"))
val rdd2 = rdd.map(x => (x.length,x))
val rb = rdd2.reduceByKey((a,b) => a + b)
rb.collect().foreach(println)

RDD转换算子、行动算子

groupByKey算子
groupByKey会将RDD[key,value]按照相同的key进行分组,形成RDD[key,Iterable[value]]的形式,有点类似于sql中的groupby、mysql中的group_concat

val rdd = sc.parallelize(List("Tiger","Cat","Lion","Eagle","Panther"))
val rdd2 = rdd.map(x => (x.length,x))
val gb = rdd2.groupByKey()
gb.collect().foreach(println)

RDD转换算子、行动算子

sortByKey算子
//对pairRDD按照key进行排序,第一个参数可以设置true(升序)或false(降序),默认为true

val rdd = sc.parallelize(List("Tiger","Cat","Lion","Eagle","Panther"))
val rdd2 = rdd.map(x => (x.length,x))
val sb = rdd2.sortByKey()	//升序
val sb2 = rdd2.sortByKey(false)	//降序
sb.collect().foreach(println)
sb2.collect().foreach(println)

RDD转换算子、行动算子

union算子
两个RDD进行合并

val u1 = sc.parallelize(1 to 3)
val u2 = sc.parallelize(3 to 4)
println("------union------")
u1.union(u2).collect().foreach(println)
println("------++------")
(u1 ++ u2).collect().foreach(println)
println("------交集------")
u1.intersection(u2).collect().foreach(println)

RDD转换算子、行动算子

join算子
把RDD1和RDD2中的相同的key给连接起来,类似于sql中的join操作

val j1 = sc.parallelize(List("abe","abby","apple")).map(x => (x,1))
val j2 = sc.parallelize(List("apple","beatty","beatrace")).map(x => (x,1))
j1.join(j2).collect().foreach(println)
j1.leftOuterJoin(j2).collect().foreach(println)
j1.rightOuterJoin(j2).collect().foreach(println)

RDD转换算子、行动算子

RDD动作算子
count算子
返回数集中元素的个数

val rdd = sc.parallelize(List(1,2,3,4,5,6))
println("元素个数为:" + rdd.count)

RDD转换算子、行动算子

collect算子
以Array返回RDD的所有元素,一般在过滤或者处理足够小的结果的时候使用

val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd.collect()
take算子
返回前n个元素
val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd.take(3).foreach(println)

RDD转换算子、行动算子

first算子
返回RDD第一个元素

val rdd = sc.parallelize(List(1,2,3,4,5,6))
println(rdd.first.collect())

RDD转换算子、行动算子

reduce算子
根据指定函数,对RDD中的元素进行两两计算,返回计算结果

val rdd = sc.parallelize(1 to 100)
val sum = rdd.reduce((x,y) => {
println(x,y);
x+y
})
//val sum = rdd.reduce(_+_)	与上面等价
println("总和:" + sum)

RDD转换算子、行动算子

  	val b = sc.parallelize(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
    println(b.reduce((x,y) => {(x._1+y._1,x._2+y._2)}))

RDD转换算子、行动算子

foreach算子
对RDD中的每个元素都使用指定函数,无返回值

val rdd = sc.parallelize(1 to 100)
rdd.foreach(println)

RDD转换算子、行动算子

lookup算子
用于PairRDD,返回Key对应的所有value值

val rdd = sc.parallelize(List(('a',1),('a',2),('b',3),('c',4)))
println(rdd.lookup('a'))

RDD转换算子、行动算子

saveAsTextFile算子
保存RDD数据至文件系统

val rdd = sc.parallelize(1 to 10,2)
rdd.saveAsTextFile("hdfs://hadoop004:9000/workspace/data/")

最值:返回最大值、最小值

val a = sc.parallelize(10 to 30)
println(a.max)
println(a.min)

RDD转换算子、行动算子