欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMap

程序员文章站 2022-06-01 21:07:15
...

parallelize

通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集

scala版本

def parallelize[T](seq: Seq[T],numSlices: Int)(implicit arg0: ClassTag[T]): RDD[T]

  • 第一个参数是一个seq集合
  • 第二个参数是分区数,可省略
  • 返回值是一个RDD
scala> sc.parallelize(List("hello","world","hello","scala"))
res0: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:25

java版本

def parallelize[T](list: java.util.List[T]): JavaRDD[T] =parallelize(list, sc.defaultParallelism)

  • 第一个参数为List集合
  • 第二参数为分区数,可省略
  • 返回值为JavaRDD[T]类型
public static void main(String[] args) {
    SparkConf  conf = new SparkConf().setMaster("local[*]").setAppName("parallelizeJava");
    JavaSparkContext sc = new JavaSparkContext(conf);
    // Arrays.asList该方法是将数组转化成List集合的方法
    List<String> list = Arrays.asList("hello java","hello scala","hello world");
    JavaRDD<String> rdd1 = sc.parallelize(list);
}

makeRDD

def makeRDD[T](seq: Seq[(T, Seq[String])],numSlices: Int)(implicit arg0: ClassTag[T]): RDD[T]

  • scala版本的才有makeRDD
  • 该方法的底层调用parallelize(seq, numSlices)实现
val rdd:RDD[String] = sc.makeRDD(List("hi scala", "hi java", "hi world"))

textFile

从外部存储中读取数据来创建 RDD

scala版本

def textFile(path: String,minPartitions: Int): org.apache.spark.rdd.RDD[String]

  • 第一个参数为文件路径
  • 第二个参数指定最小分区数
  • 返回值为RDD[String]
// 本地文件
val rdd1:RDD[String] = sc.textFile("file:///f:/a.txt")
// hdfs文件系统
val rdd2:RDD[String] = sc.textFile("hdfs://hadoop4:9000/user/a.txt")

java版本

// 本地文件
JavaRDD<String> rdd1 = sc.textFile("f:/a.txt");
// hdfs文件系统
JavaRDD<String> rdd2 = sc.textFile("hdfs://192.168.233.133:9000/user/a.txt");

textFile支持分区,支持模式匹配,例如把某目录下的所有txt文件转换成RDD

JavaRDD<String> rdd1 = sc.textFile("f:/*.txt");

filter

filter用于将数据按照某一规则过滤,返回满足条件的内容

scala版本

例:将List(1,2,3,4,5,6,7,8,9,10)中的偶数输出

 val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
 rdd1.filter(_%2==0).collect.foreach(println)

Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMap

java版本

// 获取rdd
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
// rdd的filter方法参数需要一个函数,需要重写该函数的call方法
JavaRDD<Integer> filterRDD = rdd1.filter(new Function<Integer, Boolean>() {
    @Override
    public Boolean call(Integer v1) throws Exception {
        return v1 % 2 == 0;
    }
});
List<Integer> collect = filterRDD.collect();
// 输出
for (Integer i : collect) {
    System.out.println(i);
}

Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMap

map

map() 接收一个函数,把这个函数用于集合中的每个元素,将函数的返回值作为结果RDD

scala版本

例:List(“hello”,“scala”,“java”,“world”),将list中每个元素变为(k,v)形式,k为元素本身,v为该元素长度,如:(scala,5)

val rdd1 = sc.parallelize(List("hello","scala","java","world"))
rdd1.map(x=>(x,x.size)).collect.foreach(println)

Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMap

java版本

JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("hello", "scala", "java", "world"));
JavaRDD<Tuple2<String, Integer>> mapRDD = rdd2.map(new Function<String, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> call(String v1) throws Exception {
        return new Tuple2<>(v1, v1.length());
    }
});
List<Tuple2<String, Integer>> collect1 = mapRDD.collect();
for (Tuple2<String, Integer> tuple2 : collect1) {
    System.out.println(tuple2);
}

Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMap

flatMap

flatMap的函数应用于每一个元素,对于每一个元素返回的是多个元素组成的迭代器

scala版本

例:将数据切分为单词

val rdd1 = sc.parallelize(List("hello scala","hello java","hello world"))
rdd1.flatMap(x=>x.split(" ")).collect.foreach(println)

Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMap

java版本

spark2.0以上,对flatMap的方法有所修改,2.0版本以下重写call的返回值是Iteratable类型,2.0以上是Iterator类型

  • spark 2.0 以下
JavaRDD<String> rdd3 = sc.parallelize(Arrays.asList("hello scala", "hello java", "hello world"));
JavaRDD<String> flatMapRDD = rdd3.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterable<String> call(String s) throws Exception {
        String[] strings = s.split(" ");
        return Arrays.asList(strings);
    }
});
  • spark 2.0 以上
JavaRDD<String> rdd3 = sc.parallelize(Arrays.asList("hello scala", "hello java", "hello world"));
JavaRDD<String> flatMapRDD = rdd3.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String s) throws Exception {
        String[] strings = s.split(" ");
        return Arrays.asList(strings).iterator();
    }
});
List<String> collect2 = flatMapRDD.collect();
for (String s : collect2) {
    System.out.println(s);
}

Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMap