欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark快速上手(一)

程序员文章站 2024-03-13 10:30:57
...

Spark中的概念

RDD

RDD:弹性数据集,特点是可以并行计算 ,并且是容错的,有两种方式可以创建RDD。

  1. 执行Transform操作(变换操作)
  2. 读取外部存储系统的数据集,如hdfs,hbase,或者任何与hadoop有关的数据源。
    可以理解为它是spark提供的特殊的集合类。如传统的Array:(1,2,3,4,5)是一个整体,但是转换成RDD后,我们可以对数据进行Partition(分区处理),这样做的目的就是为了分布式。
    可以让这个RDD有两个分区,那么有可能是RDD(1,2) RDD(3,4).
    这样设计的目的在于可以进行分布式运算。
创建RDD

scala示例1:
基于一个基本的集合类型转换而来

val data = Array(1,2,3,4,5)
val r1 = sc.sparallelize(data)
val r2 = sc.sparallelize(data,2)

scala示例2:
从外部文件转换而来

val distFile = sc.textFile("data.txt")
查看RDD

手机rdd中的数据组成Array返回,此方法将会把分布式存储的rdd中的数据集中到一台机器中组建Array。此方法在生产环境中一定要慎用,容易造成内存溢出。

scala>rdd.collect
scala>rdd.partitions

查看RDD分区数量

scala>rdd.partitions.size

查看RDD每个分区的元素,此方法会将每个分区的元素以Array形式返回

scala>rdd.glom.collect 

分区的概念

Spark快速上手(一)
如上图中,一个RDD有item1~item25,共5个分区,分别在3台机器上进行处理。此外,spark并没有原生的提供rdd的分区查看工具,我们可以自己写一个。

import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag

object su{
def debug[T:ClassTag](rdd:RDD[T]) = {
rdd.mapPartitionsWithIndex((i:Int,inter:Iterator[T])=>{
val m = scala.collection.mutable.Map[Int,List[T]]()
while (iter:+iter.next)
}
m(i) = list
m.iterator
}).collect().foreach((x:Tuple2[Int,List[T]])=>{
val i = x._1
println(s"partition:[$i]")
s._2.foreache{println}
})
}
}

RDD操作

针对RDD的操作,分两种,一种是Transformation(变换),一种是Actions(执行)。
Transformation(变换)操作属于懒操作(算子),不会真正触发RDD的处理计算。
Actions(执行)操作才会真正触发。

map(func)

参数是函数,函数应用于RDD每一个元素,返回是新的RDD 。

val rdd = sc.makeRDD(List(1,3,5,7,9))       				  
rdd.map(_*10)
flatMap(func)

扁平化map,对RDD每个元素的转换,然后再扁平化处理

val = sc.makeRDD(List("hello world","hello scala","hello spark"))
#Array(Array(hello, world), Array(hello, count), Array(world, spark))
rdd.map(_.split{" "})
#Array[String] = Array(hello, world, hello, count, world, spark) 
rdd.flatMap(_.split{" "})
filter(func)

参数是函数,函数会过滤掉不符合条件的元素,返回的是新的RDD

val rdd = sc.makeRDD(List(1,3,5,7,9))
rdd.filter(_<5)
mapPartitions(func)

和map函数类似,只不过映射函数的参数由RDD的每一个元素变成了RDD中每一个分区的迭代器。

val rdd3 = rdd1.mapPartitions{
x=>{
val result = List[Int]()
var i = 0
while(x.hasNext){
i+= x.next()
}
result.::(i).iterator
}
}

补充:此方法可以用于某些场景的调优,比如将数据存储数据库, 如果用map方法来存,有一条数据就会建立和销毁一次连接,性能较低所以此时可以用mapPartitions代替map

mapPartitionsWithIndex(func)

作用与mapPartitions相同,不过提供了两个参数,第一个参数为分区的索引。
案例:每个分区数字进行累加并在前加上分区号

var rdd1   = sc.makeRDD(1 to 52)
var rdd2 = rdd1.mapPartitionsWithIndex{
(index,iter)=>{
var result = List[String]()
var i = 0
while(iter.hashNext){
i +=iter.next()}
}
result.::(index+"|"+i).iterator
}
}

Spark快速上手(一)
根据分区返回数字+分区对应字母

val rdd = sc.makeRDD(List(1,2,3,4,5),2); 
rdd.mapPartitionsWithIndex((index,iter)=>{ var list = List[String]()
while(iter.hasNext){
if(index==0) list = list :+ (iter.next + "a")
else {
list = list :+ (iter.next + "b")
}
}
list.iterator
});

Spark快速上手(一)

union

并集,也可用++实现

val rdd1 = sc.makeRDD(List(1,3,5));
val rdd2 = sc.makeRDD(List(2,4,6,8));
val rdd = rdd1.union(rdd2);
val rdd = rdd1 ++ rdd2;

Spark快速上手(一)

intersection

交集

val rdd1 = sc.makeRDD(List(1,3,5,7));
val rdd2 = sc.makeRDD(List(5,7,9,11));
val rdd = rdd1.intersection(rdd2);

Spark快速上手(一)

subtract

差集

val rdd1 = sc.makeRDD(List(1,3,5,7,9));
val rdd2 = sc.makeRDD(List(5,7,9,11,13));
val rdd =  rdd1.subtract(rdd2);

Spark快速上手(一)

distinct[numTasks]

去重

val rdd = sc.makeRDD(List(1,3,5,7,9,3,7,10,23,7));
rdd.distinct
groupByKey([ numTasks ])
scala>val rdd = sc.parallelize(List(("cat",2), ("dog",5),("cat",4),("dog",3),("cat",6),("dog",3),("cat",9),("dog",1)),2); 
scala>rdd.groupByKey()

注:groupByKey对于数据格式是有要求的,即操作的元素必须是一个二元tuple,tuple._1 是key,tuple._2是value

reduceByKey( func,[ numTasks ])
var rdd = sc.makeRDD( List( ("hello",1),("spark",1),("hello",1),("world",1) ) )
rdd.reduceByKey(_+_);

reduceByKey操作的数据格式必须是一个二元tuple

aggregateByKey( zeroValue )( seqOp ,combOp , [ numTasks ])

使用方法及案例展示:
aggregateByKey(zeroValue)(func1,func2)
zeroValue表示初始值,初始值会参与func1的计算
在分区内,按key分组,把每组的值进行fun1的计算
再将每个分区每组的计算结果按fun2进行计算

val rdd = sc.parallelize(List(("cat",2),("dog",5),("cat",4),("dog",3),("cat",6),("dog",3),("cat",9),("dog",1)),2);

分区查询结果:
partition:[0]
(cat,2)
(dog,5)
(cat,4)
(dog,3)

partition:[1]
(cat,6)
(dog,3)
(cat,9)
(dog,1)

rdd.aggregateByKey(0)( _+_  ,  _*_);

Spark快速上手(一)

sortByKey([ ascending ], [ numTasks ])
val d2 = 
sc.parallelize(Array(("cc",32),("bb",32),("cc",22),("aa",18),("bb",6),("dd",16),("ee",1
04),("cc",1),("ff",13),("gg",68),("bb",44))) 

Spark快速上手(一)

join( otherDataset , [ numTasks ])
cartesian( otherDataset )

求笛卡儿积

val rdd1 = sc.makeRDD(List(1,2,3))
val rdd2 = sc.makeRDD(List("a","b"))
rdd1.cartesian(rdd2);
coalesce( numPartitions )

coalesce(n,true/false) 扩大或缩小分区

val rdd = sc.makeRDD(List(1,2,3,4,5),2) 
rdd.coalesce(3,true);//如果是扩大分区 需要传入一个true 表示要重新shuffle 
rdd.coalesce(2);//如果是缩小分区 默认就是false 不需要明确的传入
相关标签: Spark