spark学习 - spark编程模型(各种算子)
程序员文章站
2022-07-15 17:59:07
...
文章目录
- Spark API文档
- Value类型 Transformation 算子分类
- Transformation-map
- Transformation-mapPartitions
- Transformation-flatMap
- Transformation-flatMap
- Transformation-union
- Transformation-distinct
- Transformation-filter
- Transformation-intersection
- Key-Value类型 Transformation 算子分类
- Transformation-groupByKey
- Transformation-groupByKey
- Transformation-reduceByKey
- Transformation-aggregateByKey
- Transformation-join
- Action 算子分类
核心思想:
-
对于RDD有四种类型的算子
- Create
- SparkContext.textFile()
- SparkContext.parallelize()
- Transformation
- 作用于一个或者多个RDD,输出转换后的RDD
- 例如:map, filter, groupBy
- Action
- 会触发Spark提交作业,并将结果返回Driver Program
- 例如:reduce, countByKey
- Cache
- cache 缓存
- persist 持久化
- Create
-
惰性运算:遇到Action时才会真正的执行。
-
Example
-
运行Spark方式
- CDH 集群上运行Spark-Shell
- 在Shell中输入spark-shell --master yarn-client
- 使用Zeppelin
- sudo docker run -p 8080:8080 --rm --name zeppelin apache/zeppelin:0.7.3
- https://zeppelin.apache.org
- 使用Spark-Submit递交作业
- CDH 集群上运行Spark-Shell
Spark API文档
访问官方文档:https://spark.apache.org/docs/latest/
Value类型 Transformation 算子分类
Transformation-map
- map
- def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]):RDD[U]
- 生成一个新的RDD,新的RDD中每个元素均有父RDD通过作用func函数映射变换而来
- 新的RDD叫做MappedRDD
- Example
val rd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
val rd2 = rd1.map(x => x * 2)
rd2.collect()
rd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at
parallelize
rd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12)
Transformation-mapPartitions
- mapPartitions
- def mapPartitions[U](f: (Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]):
RDD[U] - 获取到每个分区的迭代器
- 对每个分区中每个元素进行操作
- def mapPartitions[U](f: (Iterator[T]) => Iterator[U],
- Example
val rd1 = sc.parallelize(List("20180101", "20180102", "20180103", "20180104", "20180105",
"20180106"), 2)
val rd2 = rd1.mapPartitions(iter => {
val dateFormat = new java.text.SimpleDateFormat("yyyyMMdd")
iter.map(dateStr => dateFormat.parse(dateStr))
})
rd2.collect()
res1: Array[java.util.Date] = Array(Mon Jan 01 00:00:00 UTC 2018, Tue Jan 02 00:00:00 UTC 2018, Wed Jan 03
00:00:00 UTC 2018, Thu Jan 04 00:00:00 UTC 2018, Fri Jan 05 00:00:00 UTC 2018, Sat Jan 06 00:00:00 UTC 2018)
Transformation-flatMap
- flatMap
- def flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]
- 将RDD中的每个元素通过func转换为新的元素
- 进行扁平化:合并所有的集合为一个新集合
- 新的RDD叫做FlatMappedRDD
- Example
val rd1 = sc.parallelize(Seq("I have a pen",
"I have an apple",
"I have a pen",
"I have a pineapple"), 2)
val rd2 = rd1.map(s => s.split(" "))
rd2.collect()
val rd3 = rd1.flatMap(s => s.split(" "))
rd3.collect()
rd3.partitions
res136: Array[Array[String]] = Array(Array(I, have, a, pen), Array(I, have, an, apple), Array(I, have, a, pen), Array(I, have,
a, pineapple))
res137: Array[String] = Array(I, have, a, pen, I, have, an, apple, I, have, a, pen, I, have, a, pineapple)
Transformation-flatMap
Transformation-union
- union
- def union(other: RDD[T]): RDD[T]
- 合并两个RDD
- 元素数据类型需要相同,并不进行去重操作
- Example
val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))
val rdd2 = sc.parallelize(Seq("Banana", "Pineapple"))
val rdd3 = sc.parallelize(Seq("Durian"))
val rddUnion = rdd1.union(rdd2).union(rdd3)
rddUnion.collect.foreach(println)
res1: Array[String] = Array(Apple, Banana, Orange, Banana, Pineapple, Durian)
Transformation-distinct
- distinct
- def distinct(): RDD[T]
- 对RDD中的元素进行去重操作
- Example
val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))
val rdd2 = sc.parallelize(Seq("Banana", "Pineapple"))
val rdd3 = sc.parallelize(Seq("Durian"))
val rddUnion = rdd1.union(rdd2).union(rdd3)
val rddDistinct = rddUnion.distinct()
rddDistinct.collect()
res1: Array[String] = Array(Orange, Apple, Banana, Pineapple, Durian)
Transformation-filter
- filter
- def filter(f: (T) ⇒ Boolean): RDD[T]
- 对RDD元素的数据进行过滤
- 当满足f返回值为true时保留元素,否则丢弃
- Example
val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))
val filteredRDD = rdd1.filter(item => item.length() >= 6)
filteredRDD.collect()
res1: Array[String] = Array(Banana, Orange)
Transformation-intersection
- interesction
- def intersection(other: RDD[T]): RDD[T]
- def intersection(other: RDD[T], numPartitions: Int): RDD[T]
- def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
- 对两个RDD元素取交集
- Example
val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))
val rdd2 = sc.parallelize(Seq("Banana", "Pineapple"))
val rddIntersection = rdd1.intersection(rdd2)
rddIntersection.collect()
res1: Array[String] = Array(Banana)
Key-Value类型 Transformation 算子分类
Transformation-groupByKey
- groupByKey
- def groupByKey(): RDD[(K, Iterable[V])]
- def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
- 对RDD[Key, Value]按照相同的key进行分组
- Example
val scoreDetail = sc.parallelize(List(("xiaoming","A"), ("xiaodong","B"),
("peter","B"), ("liuhua","C"), ("xiaofeng","A")), 3)
scoreDetail.map(score_info => (score_info._2, score_info._1))
.groupByKey()
.collect()
.foreach(println(_))
scoreDetail: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[110] at parallelize
(A,CompactBuffer(xiaoming, xiaofeng))
(B,CompactBuffer(xiaodong, peter))
(C,CompactBuffer(lihua))
Transformation-groupByKey
Transformation-reduceByKey
- reduceByKey
- Example
Transformation-aggregateByKey
- 如何分组计算平均值?
[(A,110),(A,130),(A,120),(B,200),(B,206),(B,206),(C,150),(C,160),(C,170)]
Transformation-join
Action 算子分类
上一篇: 泛型
下一篇: 192. Word Frequency。