欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark学习 - spark编程模型(各种算子)

程序员文章站 2022-07-15 17:59:07
...


核心思想:
spark学习 - spark编程模型(各种算子)

  • 对于RDD有四种类型的算子

    • Create
      • SparkContext.textFile()
      • SparkContext.parallelize()
    • Transformation
      • 作用于一个或者多个RDD,输出转换后的RDD
      • 例如:map, filter, groupBy
    • Action
      • 会触发Spark提交作业,并将结果返回Driver Program
      • 例如:reduce, countByKey
    • Cache
      • cache 缓存
      • persist 持久化
  • 惰性运算:遇到Action时才会真正的执行。

  • Example
    spark学习 - spark编程模型(各种算子)

  • 运行Spark方式

    • CDH 集群上运行Spark-Shell
      • 在Shell中输入spark-shell --master yarn-client
    • 使用Zeppelin
    • 使用Spark-Submit递交作业

Spark API文档

访问官方文档:https://spark.apache.org/docs/latest/
spark学习 - spark编程模型(各种算子)

Value类型 Transformation 算子分类

spark学习 - spark编程模型(各种算子)

Transformation-map

  • map
    • def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]):RDD[U]
    • 生成一个新的RDD,新的RDD中每个元素均有父RDD通过作用func函数映射变换而来
    • 新的RDD叫做MappedRDD

spark学习 - spark编程模型(各种算子)

  • Example
val rd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
val rd2 = rd1.map(x => x * 2)
rd2.collect()

rd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at
parallelize
rd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12)

Transformation-mapPartitions

  • mapPartitions
    • def mapPartitions[U](f: (Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]):
      RDD[U]
    • 获取到每个分区的迭代器
    • 对每个分区中每个元素进行操作
  • Example
val rd1 = sc.parallelize(List("20180101", "20180102", "20180103", "20180104", "20180105",
"20180106"), 2)
val rd2 = rd1.mapPartitions(iter => {
val dateFormat = new java.text.SimpleDateFormat("yyyyMMdd")
iter.map(dateStr => dateFormat.parse(dateStr))
})
rd2.collect()

res1: Array[java.util.Date] = Array(Mon Jan 01 00:00:00 UTC 2018, Tue Jan 02 00:00:00 UTC 2018, Wed Jan 03
00:00:00 UTC 2018, Thu Jan 04 00:00:00 UTC 2018, Fri Jan 05 00:00:00 UTC 2018, Sat Jan 06 00:00:00 UTC 2018)

Transformation-flatMap

  • flatMap
    • def flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]
    • 将RDD中的每个元素通过func转换为新的元素
    • 进行扁平化:合并所有的集合为一个新集合
    • 新的RDD叫做FlatMappedRDD
  • Example
val rd1 = sc.parallelize(Seq("I have a pen",
"I have an apple",
"I have a pen",
"I have a pineapple"), 2)
val rd2 = rd1.map(s => s.split(" "))
rd2.collect()
val rd3 = rd1.flatMap(s => s.split(" "))
rd3.collect()
rd3.partitions

res136: Array[Array[String]] = Array(Array(I, have, a, pen), Array(I, have, an, apple), Array(I, have, a, pen), Array(I, have,
a, pineapple))
res137: Array[String] = Array(I, have, a, pen, I, have, an, apple, I, have, a, pen, I, have, a, pineapple)

Transformation-flatMap

spark学习 - spark编程模型(各种算子)

Transformation-union

  • union
    • def union(other: RDD[T]): RDD[T]
    • 合并两个RDD
    • 元素数据类型需要相同,并不进行去重操作
  • Example
val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))
val rdd2 = sc.parallelize(Seq("Banana", "Pineapple"))
val rdd3 = sc.parallelize(Seq("Durian"))
val rddUnion = rdd1.union(rdd2).union(rdd3)
rddUnion.collect.foreach(println)

res1: Array[String] = Array(Apple, Banana, Orange, Banana, Pineapple, Durian)

Transformation-distinct

  • distinct
    • def distinct(): RDD[T]
    • 对RDD中的元素进行去重操作
  • Example
val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))
val rdd2 = sc.parallelize(Seq("Banana", "Pineapple"))
val rdd3 = sc.parallelize(Seq("Durian"))
val rddUnion = rdd1.union(rdd2).union(rdd3)
val rddDistinct = rddUnion.distinct()
rddDistinct.collect()

res1: Array[String] = Array(Orange, Apple, Banana, Pineapple, Durian)

Transformation-filter

  • filter
    • def filter(f: (T) ⇒ Boolean): RDD[T]
    • 对RDD元素的数据进行过滤
    • 当满足f返回值为true时保留元素,否则丢弃
  • Example
val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))
val filteredRDD = rdd1.filter(item => item.length() >= 6)
filteredRDD.collect()

res1: Array[String] = Array(Banana, Orange)

Transformation-intersection

  • interesction
    • def intersection(other: RDD[T]): RDD[T]
    • def intersection(other: RDD[T], numPartitions: Int): RDD[T]
    • def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
    • 对两个RDD元素取交集
  • Example
val rdd1 = sc.parallelize(Seq("Apple", "Banana", "Orange"))
val rdd2 = sc.parallelize(Seq("Banana", "Pineapple"))
val rddIntersection = rdd1.intersection(rdd2)
rddIntersection.collect()

res1: Array[String] = Array(Banana)

Key-Value类型 Transformation 算子分类

spark学习 - spark编程模型(各种算子)

Transformation-groupByKey

  • groupByKey
    • def groupByKey(): RDD[(K, Iterable[V])]
    • def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
    • def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
    • 对RDD[Key, Value]按照相同的key进行分组
  • Example
val scoreDetail = sc.parallelize(List(("xiaoming","A"), ("xiaodong","B"),
("peter","B"), ("liuhua","C"), ("xiaofeng","A")), 3)
scoreDetail.map(score_info => (score_info._2, score_info._1))
	.groupByKey()
	.collect()
	.foreach(println(_))

scoreDetail: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[110] at parallelize
(A,CompactBuffer(xiaoming, xiaofeng))
(B,CompactBuffer(xiaodong, peter))
(C,CompactBuffer(lihua))

Transformation-groupByKey

spark学习 - spark编程模型(各种算子)

Transformation-reduceByKey

  • reduceByKey
    spark学习 - spark编程模型(各种算子)
  • Example
    spark学习 - spark编程模型(各种算子)
    spark学习 - spark编程模型(各种算子)

Transformation-aggregateByKey

  • 如何分组计算平均值?
    [(A,110),(A,130),(A,120),(B,200),(B,206),(B,206),(C,150),(C,160),(C,170)]
    spark学习 - spark编程模型(各种算子)
    spark学习 - spark编程模型(各种算子)

Transformation-join

spark学习 - spark编程模型(各种算子)

Action 算子分类

spark学习 - spark编程模型(各种算子)
spark学习 - spark编程模型(各种算子)
spark学习 - spark编程模型(各种算子)
spark学习 - spark编程模型(各种算子)
spark学习 - spark编程模型(各种算子)

上一篇: 泛型

下一篇: 192. Word Frequency。