欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SparkRDD算子--mapPartitionsWithIndex算子

程序员文章站 2024-01-19 17:25:04
...

语法

val newRdd = oldRdd.mapPartitionsWithIndex{case (num, datas) => {func}}

源码

def mapPartitionsWithIndex[U](f : scala.Function2[scala.Int, scala.Iterator[T], scala.Iterator[U]], preservesPartitioning : scala.Boolean = { /* compiled code */ })(implicit evidence$9 : scala.reflect.ClassTag[U]) : org.apache.spark.rdd.RDD[U] = { /* compiled code */ }

作用

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

例子

package com.day1

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object oper {
    def main(args: Array[String]): Unit = {
        val config:SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")

        // 创建上下文对象
        val sc = new SparkContext(config)

        // mapPartitionsWithIndex算子
        val listRdd:RDD[Int] = sc.makeRDD(1 to 10,2)

        val tupleRdd: RDD[(Int, String)] = listRdd.mapPartitionsWithIndex {
            case (num, datas) => {
                datas.map((_, "分区号为:" + num))
            }
        }

        tupleRdd.collect().foreach(println)
    }
}

输入:
1 2 3 4 5 6 7 8 9 10
输出:
(1,分区号为:0)
(2,分区号为:0)
(3,分区号为:0)
(4,分区号为:0)
(5,分区号为:0)
(6,分区号为:1)
(7,分区号为:1)
(8,分区号为:1)
(9,分区号为:1)
(10,分区号为:1)

示意图

SparkRDD算子--mapPartitionsWithIndex算子