Spark RDD算子(一) parallelize,makeRDD,textFile,filter,map,flatMap
Spark RDD算子(一)
parallelize
通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集
scala版本
def parallelize[T](seq: Seq[T],numSlices: Int)(implicit arg0: ClassTag[T]): RDD[T]
- 第一个参数是一个seq集合
- 第二个参数是分区数,可省略
- 返回值是一个RDD
scala> sc.parallelize(List("hello","world","hello","scala"))
res0: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:25
java版本
def parallelize[T](list: java.util.List[T]): JavaRDD[T] =parallelize(list, sc.defaultParallelism)
- 第一个参数为List集合
- 第二参数为分区数,可省略
- 返回值为JavaRDD[T]类型
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("parallelizeJava");
JavaSparkContext sc = new JavaSparkContext(conf);
// Arrays.asList该方法是将数组转化成List集合的方法
List<String> list = Arrays.asList("hello java","hello scala","hello world");
JavaRDD<String> rdd1 = sc.parallelize(list);
}
makeRDD
def makeRDD[T](seq: Seq[(T, Seq[String])],numSlices: Int)(implicit arg0: ClassTag[T]): RDD[T]
- scala版本的才有makeRDD
- 该方法的底层调用parallelize(seq, numSlices)实现
val rdd:RDD[String] = sc.makeRDD(List("hi scala", "hi java", "hi world"))
textFile
从外部存储中读取数据来创建 RDD
scala版本
def textFile(path: String,minPartitions: Int): org.apache.spark.rdd.RDD[String]
- 第一个参数为文件路径
- 第二个参数指定最小分区数
- 返回值为RDD[String]
// 本地文件
val rdd1:RDD[String] = sc.textFile("file:///f:/a.txt")
// hdfs文件系统
val rdd2:RDD[String] = sc.textFile("hdfs://hadoop4:9000/user/a.txt")
java版本
// 本地文件
JavaRDD<String> rdd1 = sc.textFile("f:/a.txt");
// hdfs文件系统
JavaRDD<String> rdd2 = sc.textFile("hdfs://192.168.233.133:9000/user/a.txt");
textFile支持分区,支持模式匹配,例如把某目录下的所有txt文件转换成RDD
JavaRDD<String> rdd1 = sc.textFile("f:/*.txt");
filter
filter用于将数据按照某一规则过滤,返回满足条件的内容
scala版本
例:将List(1,2,3,4,5,6,7,8,9,10)中的偶数输出
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
rdd1.filter(_%2==0).collect.foreach(println)
java版本
// 获取rdd
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
// rdd的filter方法参数需要一个函数,需要重写该函数的call方法
JavaRDD<Integer> filterRDD = rdd1.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
List<Integer> collect = filterRDD.collect();
// 输出
for (Integer i : collect) {
System.out.println(i);
}
map
map() 接收一个函数,把这个函数用于集合中的每个元素,将函数的返回值作为结果RDD
scala版本
例:List(“hello”,“scala”,“java”,“world”),将list中每个元素变为(k,v)形式,k为元素本身,v为该元素长度,如:(scala,5)
val rdd1 = sc.parallelize(List("hello","scala","java","world"))
rdd1.map(x=>(x,x.size)).collect.foreach(println)
java版本
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("hello", "scala", "java", "world"));
JavaRDD<Tuple2<String, Integer>> mapRDD = rdd2.map(new Function<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String v1) throws Exception {
return new Tuple2<>(v1, v1.length());
}
});
List<Tuple2<String, Integer>> collect1 = mapRDD.collect();
for (Tuple2<String, Integer> tuple2 : collect1) {
System.out.println(tuple2);
}
flatMap
flatMap的函数应用于每一个元素,对于每一个元素返回的是多个元素组成的迭代器
scala版本
例:将数据切分为单词
val rdd1 = sc.parallelize(List("hello scala","hello java","hello world"))
rdd1.flatMap(x=>x.split(" ")).collect.foreach(println)
java版本
spark2.0以上,对flatMap的方法有所修改,2.0版本以下重写call的返回值是Iteratable类型,2.0以上是Iterator类型
- spark 2.0 以下
JavaRDD<String> rdd3 = sc.parallelize(Arrays.asList("hello scala", "hello java", "hello world"));
JavaRDD<String> flatMapRDD = rdd3.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
String[] strings = s.split(" ");
return Arrays.asList(strings);
}
});
- spark 2.0 以上
JavaRDD<String> rdd3 = sc.parallelize(Arrays.asList("hello scala", "hello java", "hello world"));
JavaRDD<String> flatMapRDD = rdd3.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] strings = s.split(" ");
return Arrays.asList(strings).iterator();
}
});
List<String> collect2 = flatMapRDD.collect();
for (String s : collect2) {
System.out.println(s);
}
上一篇: CSS里transform
下一篇: Android—LitePal操作数据库