Spark快速上手(一)
Spark中的概念
RDD
RDD:弹性数据集,特点是可以并行计算 ,并且是容错的,有两种方式可以创建RDD。
- 执行Transform操作(变换操作)
- 读取外部存储系统的数据集,如hdfs,hbase,或者任何与hadoop有关的数据源。
可以理解为它是spark提供的特殊的集合类。如传统的Array:(1,2,3,4,5)是一个整体,但是转换成RDD后,我们可以对数据进行Partition(分区处理),这样做的目的就是为了分布式。
可以让这个RDD有两个分区,那么有可能是RDD(1,2) RDD(3,4).
这样设计的目的在于可以进行分布式运算。
创建RDD
scala示例1:
基于一个基本的集合类型转换而来
val data = Array(1,2,3,4,5)
val r1 = sc.sparallelize(data)
val r2 = sc.sparallelize(data,2)
scala示例2:
从外部文件转换而来
val distFile = sc.textFile("data.txt")
查看RDD
手机rdd中的数据组成Array返回,此方法将会把分布式存储的rdd中的数据集中到一台机器中组建Array。此方法在生产环境中一定要慎用,容易造成内存溢出。
scala>rdd.collect
scala>rdd.partitions
查看RDD分区数量
scala>rdd.partitions.size
查看RDD每个分区的元素,此方法会将每个分区的元素以Array形式返回
scala>rdd.glom.collect
分区的概念
如上图中,一个RDD有item1~item25,共5个分区,分别在3台机器上进行处理。此外,spark并没有原生的提供rdd的分区查看工具,我们可以自己写一个。
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
object su{
def debug[T:ClassTag](rdd:RDD[T]) = {
rdd.mapPartitionsWithIndex((i:Int,inter:Iterator[T])=>{
val m = scala.collection.mutable.Map[Int,List[T]]()
while (iter:+iter.next)
}
m(i) = list
m.iterator
}).collect().foreach((x:Tuple2[Int,List[T]])=>{
val i = x._1
println(s"partition:[$i]")
s._2.foreache{println}
})
}
}
RDD操作
针对RDD的操作,分两种,一种是Transformation(变换),一种是Actions(执行)。
Transformation(变换)操作属于懒操作(算子),不会真正触发RDD的处理计算。
Actions(执行)操作才会真正触发。
map(func)
参数是函数,函数应用于RDD每一个元素,返回是新的RDD 。
val rdd = sc.makeRDD(List(1,3,5,7,9))
rdd.map(_*10)
flatMap(func)
扁平化map,对RDD每个元素的转换,然后再扁平化处理
val = sc.makeRDD(List("hello world","hello scala","hello spark"))
#Array(Array(hello, world), Array(hello, count), Array(world, spark))
rdd.map(_.split{" "})
#Array[String] = Array(hello, world, hello, count, world, spark)
rdd.flatMap(_.split{" "})
filter(func)
参数是函数,函数会过滤掉不符合条件的元素,返回的是新的RDD
val rdd = sc.makeRDD(List(1,3,5,7,9))
rdd.filter(_<5)
mapPartitions(func)
和map函数类似,只不过映射函数的参数由RDD的每一个元素变成了RDD中每一个分区的迭代器。
val rdd3 = rdd1.mapPartitions{
x=>{
val result = List[Int]()
var i = 0
while(x.hasNext){
i+= x.next()
}
result.::(i).iterator
}
}
补充:此方法可以用于某些场景的调优,比如将数据存储数据库, 如果用map方法来存,有一条数据就会建立和销毁一次连接,性能较低所以此时可以用mapPartitions代替map
mapPartitionsWithIndex(func)
作用与mapPartitions相同,不过提供了两个参数,第一个参数为分区的索引。
案例:每个分区数字进行累加并在前加上分区号
var rdd1 = sc.makeRDD(1 to 5,2)
var rdd2 = rdd1.mapPartitionsWithIndex{
(index,iter)=>{
var result = List[String]()
var i = 0
while(iter.hashNext){
i +=iter.next()}
}
result.::(index+"|"+i).iterator
}
}
根据分区返回数字+分区对应字母
val rdd = sc.makeRDD(List(1,2,3,4,5),2);
rdd.mapPartitionsWithIndex((index,iter)=>{ var list = List[String]()
while(iter.hasNext){
if(index==0) list = list :+ (iter.next + "a")
else {
list = list :+ (iter.next + "b")
}
}
list.iterator
});
union
并集,也可用++实现
val rdd1 = sc.makeRDD(List(1,3,5));
val rdd2 = sc.makeRDD(List(2,4,6,8));
val rdd = rdd1.union(rdd2);
val rdd = rdd1 ++ rdd2;
intersection
交集
val rdd1 = sc.makeRDD(List(1,3,5,7));
val rdd2 = sc.makeRDD(List(5,7,9,11));
val rdd = rdd1.intersection(rdd2);
subtract
差集
val rdd1 = sc.makeRDD(List(1,3,5,7,9));
val rdd2 = sc.makeRDD(List(5,7,9,11,13));
val rdd = rdd1.subtract(rdd2);
distinct[numTasks]
去重
val rdd = sc.makeRDD(List(1,3,5,7,9,3,7,10,23,7));
rdd.distinct
groupByKey([ numTasks ])
scala>val rdd = sc.parallelize(List(("cat",2), ("dog",5),("cat",4),("dog",3),("cat",6),("dog",3),("cat",9),("dog",1)),2);
scala>rdd.groupByKey()
注:groupByKey对于数据格式是有要求的,即操作的元素必须是一个二元tuple,tuple._1 是key,tuple._2是value
reduceByKey( func,[ numTasks ])
var rdd = sc.makeRDD( List( ("hello",1),("spark",1),("hello",1),("world",1) ) )
rdd.reduceByKey(_+_);
reduceByKey操作的数据格式必须是一个二元tuple
aggregateByKey( zeroValue )( seqOp ,combOp , [ numTasks ])
使用方法及案例展示:
aggregateByKey(zeroValue)(func1,func2)
zeroValue表示初始值,初始值会参与func1的计算
在分区内,按key分组,把每组的值进行fun1的计算
再将每个分区每组的计算结果按fun2进行计算
val rdd = sc.parallelize(List(("cat",2),("dog",5),("cat",4),("dog",3),("cat",6),("dog",3),("cat",9),("dog",1)),2);
分区查询结果:
partition:[0]
(cat,2)
(dog,5)
(cat,4)
(dog,3)
partition:[1]
(cat,6)
(dog,3)
(cat,9)
(dog,1)
rdd.aggregateByKey(0)( _+_ , _*_);
sortByKey([ ascending ], [ numTasks ])
val d2 =
sc.parallelize(Array(("cc",32),("bb",32),("cc",22),("aa",18),("bb",6),("dd",16),("ee",1
04),("cc",1),("ff",13),("gg",68),("bb",44)))
join( otherDataset , [ numTasks ])
cartesian( otherDataset )
求笛卡儿积
val rdd1 = sc.makeRDD(List(1,2,3))
val rdd2 = sc.makeRDD(List("a","b"))
rdd1.cartesian(rdd2);
coalesce( numPartitions )
coalesce(n,true/false) 扩大或缩小分区
val rdd = sc.makeRDD(List(1,2,3,4,5),2)
rdd.coalesce(3,true);//如果是扩大分区 需要传入一个true 表示要重新shuffle
rdd.coalesce(2);//如果是缩小分区 默认就是false 不需要明确的传入
上一篇: qt初学者 第一个小程序 小界面
下一篇: 圆形缓冲区-MapReduce中的