欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark学习笔记3——RDD(下)

程序员文章站 2024-01-19 19:16:16
Spark 的 RDD 学习第二节,笔记相关: 1.如何向 Spark 传递函数 2.Spark 常用的一些转化和行动操作 3.Spark 的持久化级别 ......

spark学习笔记3——rdd(下)

笔记摘抄自 [美] holden karau 等著的《spark快速大数据分析》

向spark传递函数

大部分 spark 的转化操作和一部分行动操作,都需要传递函数后进行计算。如何传递函数下文将用 java 展示。

java 向 spark 传递函数需要实现 spark 的 org.apache.spark.api.java.function 包中的接口。一些基本的接口如下表:

函数名 实现的方法 用途
function<t, r> r call(t) 接收一个输入值并返回一个输出值,用于类似map() 和
filter() 等操作中
function2<t1, t2, r> r call(t1, t2) 接收两个输入值并返回一个输出值,用于类似aggregate()
和fold() 等操作中
flatmapfunction<t, r> iterable call(t) 接收一个输入值并返回任意个输出,用于类似flatmap()
这样的操作中

通过匿名内部类

见上篇笔记例程。

通过具名类传递

class containserror implements function<string, boolean>() {
public boolean call(string x) { return x.contains("error"); }
}
...
rdd<string> errors = lines.filter(new containserror());
  • 使用具名类在程序组织比较庞大是显得比较清晰
  • 可以使用构造函数如“通过带参数的 java 函数类传递”中所示

通过带参数的 java 函数类传递

例程

import org.apache.spark.sparkconf;
import org.apache.spark.api.java.javardd;
import org.apache.spark.api.java.javasparkcontext;
import org.apache.spark.api.java.function.function;

import java.util.list;

public class contains implements function<string, boolean> {
    private string query;

    public contains(string query) {
        this.query = query;
    }

    public boolean call(string x) {
        return x.contains(query);
    }

    public static void main(string[] args) {
        sparkconf sc = new sparkconf().setappname("contains");
        javasparkcontext javasparkcontext = new javasparkcontext(sc);
        javardd<string> log = javasparkcontext.textfile(args[0]);
        
        javardd<string> content = log.filter(new contains(args[1]));
        
        list<string> contentlist = content.collect();
        for (string output : contentlist) {
            system.out.println(output);
        }
        javasparkcontext.stop();
    }
}

测试文本 test.txt

this is a test
this is a simple test
this is a simple test about rdd
let us check it out

测试结果

[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class contains ~/rddfuncnamedclass.jar ~/test.txt rdd
...
19/09/16 15:06:50 info dagscheduler: job 0 finished: collect at contains.java:24, took 0.445049 s
this is a simple test about rdd
...

通过 lambda 表达式传递(仅限于 java 8 及以上)

例程

import org.apache.spark.sparkconf;
import org.apache.spark.api.java.javardd;
import org.apache.spark.api.java.javasparkcontext;

import java.util.list;

public class lambdatest {
    public static void main(final string[] args) {
        sparkconf sc = new sparkconf().setappname("contains");
        javasparkcontext javasparkcontext = new javasparkcontext(sc);
        javardd<string> log = javasparkcontext.textfile(args[0]);

        javardd<string> content = log.filter(s -> s.contains(args[1]));

        list<string> contentlist = content.collect();
        for (string output : contentlist) {
            system.out.println(output);
        }
        javasparkcontext.stop();
    }
}

测试文本

使用上文同一个文本

运行结果

[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class contains ~/rddfuncnamedclass.jar ~/test.txt check
...
19/09/16 15:27:10 info dagscheduler: job 0 finished: collect at contains.java:24, took 0.440515 s
let us check it out
...

常见的转化操作和行动操作

spark 中有不同类型的 rdd,不同的 rdd 可以支持不同的操作。

除了基本的rdd外,还有数字类型的 rdd 支持统计型函数操作、键值对形式的 rdd 支持聚合数据的键值对操作等等。

基本rdd

针对各个元素的转化操作

为了方便,代码在 pyspark 中展示:

# map()
# map() 的返回值类型不需要和输入类型一样
>>> nums = sc.parallelize([1, 2, 3, 4])
>>> squared = nums.map(lambda x: x * x).collect()
>>> for num in squared:
...     print "%i " % (num)
... 
1 
4 
9 
16

# flatmap()
# 给flatmap() 的函数被分别应用到了输入rdd 的每个元素上。
# 返回的是一个返回值序列的迭代器。
# 
>>> lines = sc.parallelize(["hello world", "hi"])
>>> words = lines.flatmap(lambda line: line.split(" "))
>>> words.first()
'hello'

map() 和 flatmap() 区别如下:

Spark学习笔记3——RDD(下)

伪集合操作

rdd 不算是严格意义上的集合,但是一些类似集合的属性让它能够支持许多集合操作,下图展示了常见的集合操作:

Spark学习笔记3——RDD(下)

此外,rdd 还支持笛卡尔积的操作:

Spark学习笔记3——RDD(下)

以下对基本 rdd 的转化操作进行梳理:

  • 单个 rdd {1,2,3,3} 的转化操作
函数名 目的 示例 结果
map() 将函数应用于rdd 中的每个元
素,将返回值构成新的rdd
rdd.map(x => x + 1) {2, 3, 4, 4}
flatmap() 将函数应用于rdd 中的每个元
素,将返回的迭代器的所有内
容构成新的rdd。通常用来切
分单词
rdd.flatmap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter() 返回一个由通过传给filter()
的函数的元素组成的rdd
rdd.filter(x => x != 1) {2, 3, 3}
distinct() 去重 rdd.distinct() {1, 2, 3}
sample(withreplacement, fraction, [seed]) 对rdd 采样,以及是否替换 rdd.sample(false, 0.5) 非确定的
  • 两个 rdd {1,2,3},{3,4,5}的 rdd 的转化操作
函数名 目的 示例 结果
union() 生成一个包含两个rdd 中所有元
素的rdd
rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() 求两个rdd 共同的元素的rdd rdd.intersection(other) {3}
subtract() 移除一个rdd 中的内容(例如移
除训练数据)
rdd.subtract(other) {1, 2}
cartesian() 与另一个rdd 的笛卡儿积 rdd.cartesian(other) {(1, 3), (1, 4), ...
(3, 5)}

行动操作

reduce() 与 reducebykey()

例程

import org.apache.spark.sparkconf;
import org.apache.spark.api.java.javapairrdd;
import org.apache.spark.api.java.javardd;
import org.apache.spark.api.java.javasparkcontext;

import scala.tuple2;

import java.util.arrays;
import java.util.list;

public class simplereduce {
    public static void main(string[] args) {
        sparkconf sc = new sparkconf().setappname("contains");
        javasparkcontext javasparkcontext = new javasparkcontext(sc);
        list<integer> data = arrays.aslist(1, 2, 3, 4, 5);
        javardd<integer> originrdd = javasparkcontext.parallelize(data);

        integer sum = originrdd.reduce((a, b) -> a + b);
        system.out.println(sum);

        //reducebykey,按照相同的key进行reduce操作
        list<string> list = arrays.aslist("key1", "key1", "key2", "key2", "key3");
        javardd<string> stringrdd = javasparkcontext.parallelize(list);
        //转为key-value形式
        javapairrdd<string, integer> pairrdd = stringrdd.maptopair(k -> new tuple2<>(k, 1));
        list list1 = pairrdd.reducebykey((x, y) -> x + y).collect();
        system.out.println(list1);
    }

}

运行结果

...
19/09/17 17:08:37 info dagscheduler: job 0 finished: reduce at simplereduce.java:21, took 0.480038 s
15
...
19/09/17 17:08:38 info dagscheduler: job 1 finished: collect at simplereduce.java:29, took 0.237601 s
[(key3,1), (key1,2), (key2,2)]
...

aggregate()

例程

import org.apache.spark.sparkconf;
import org.apache.spark.api.java.javardd;
import org.apache.spark.api.java.javasparkcontext;
import org.apache.spark.api.java.function.function2;

import java.io.serializable;
import java.util.arrays;
import java.util.list;

public class avgcount implements serializable {
    private avgcount(int total, int num) {
        this.total = total;
        this.num = num;
    }
    private int total;
    private int num;
    private double avg() {
        return total / (double) num;
    }

    public static void main(string[] args) {
        sparkconf sc = new sparkconf().setappname("contains");
        javasparkcontext javasparkcontext = new javasparkcontext(sc);
        list<integer> data = arrays.aslist(1, 2, 3, 4, 5);
        javardd<integer> rdd = javasparkcontext.parallelize(data);
        avgcount initial = new avgcount(0, 0);
        function2<avgcount, integer, avgcount> addandcount =
                new function2<avgcount, integer, avgcount>() {
                    public avgcount call(avgcount a, integer x) {
                        a.total += x;
                        a.num += 1;
                        return a;
                    }
                };
        function2<avgcount, avgcount, avgcount> combine =
                new function2<avgcount, avgcount, avgcount>() {
                    public avgcount call(avgcount a, avgcount b) {
                        a.total += b.total;
                        a.num += b.num;
                        return a;
                    }
                };
        avgcount result = rdd.aggregate(initial, addandcount, combine);
        system.out.println(result.avg());
    }
}

运行结果

[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class avgcount ~/spark_rdd_aggregate.jar
...
19/09/18 15:28:19 info dagscheduler: job 0 finished: aggregate at avgcount.java:43, took 0.517385 s
3.0
...

常用的行动操作整理

函数名 目的 示例 结果
collect() 返回rdd 中的所有元素 rdd.collect() {1, 2, 3, 3}
count() rdd 中的元素个数 rdd.count() 4
countbyvalue() 各元素在rdd 中出现的次数 rdd.countbyvalue() {(1, 1),
(2, 1),
(3, 2)}
take(num) 从rdd 中返回num 个元素 rdd.take(2) {1, 2}
top(num) 从rdd 中返回最前面的num
个元素
rdd.top(2) {3, 3}
takeordered(num)
(ordering)
从rdd 中按照提供的顺序返
回最前面的num 个元素
rdd.takeordered(2)(myordering) {3, 3}
takesample(withreplace
ment, num, [seed])
从rdd 中返回任意一些元素 rdd.takesample(false, 1) 非确定的
reduce(func) 并行整合rdd 中所有数据
(例如sum)
rdd.reduce((x, y) => x + y) 9
fold(zero)(func) 和reduce() 一样, 但是需要
提供初始值
rdd.fold(0)((x, y) => x + y) 9
aggregate(zerovalue)
(seqop, combop)
和reduce() 相似, 但是通常
返回不同类型的函数
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
(9,4)
foreach(func) 对rdd 中的每个元素使用给
定的函数
rdd.foreach(func)

不同 rdd 的类型转换

spark 中有些函数只能作用于特定类型的 rdd。例如 mean() 和 variance() 只能处理数值 rdd,join() 只能用于处理键值对 rdd。在 scala 和 java 中都没有与之对应的标准 rdd 类,故使用这些函数时必须要确保获得了正确的专用 rdd 类。(scala 为隐式转换)

下表为 java 中针对专门类型的函数接口:

函数名 等价函数 用途
doubleflatmapfunction function<t, iterable> 用于flatmaptodouble,以
生成doublerdd
doublefunction function<t, double> 用于maptodouble,以生成
doublerdd
pairflatmapfunction<t, k, v> function<t, iterable<tuple2<k, v>>> 用于flatmaptopair,以生
成pairrdd<k, v>
pairfunction<t, k, v> function<t, tuple2<k, v>> 用于maptopair, 以生成
pairrdd<k, v>

例程

以 doublefunction 为例:

import org.apache.spark.sparkconf;
import org.apache.spark.api.java.javadoublerdd;
import org.apache.spark.api.java.javardd;
import org.apache.spark.api.java.javasparkcontext;
import org.apache.spark.api.java.function.doublefunction;

import java.util.arrays;

public class doublerdd {
    public static void main(string[] args) {
        sparkconf sparkconf=new sparkconf().setappname("doublerdd");
        javasparkcontext javasparkcontext=new javasparkcontext(sparkconf);
        javardd<integer> rdd = javasparkcontext.parallelize(arrays.aslist(1, 2, 3, 4));
        javadoublerdd result = rdd.maptodouble(
                new doublefunction<integer>() {
                    public double call(integer x) {
                        return (double) x * x;
                    }
                });
        system.out.println(result.mean());
    }
}

运行结果

[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class doublerdd ~/spark_rdd_doublerdd.jar
...
19/09/18 16:09:38 info dagscheduler: job 0 finished: mean at doublerdd.java:20, took 0.500705 s
7.5
...

持久化

为了避免多次计算同一个 rdd,我们常常对数据进行持久化处理。具体操作可以参见上一节例程。

tips:

  • 在scala 和java 中,默认情况下 persist() 会把数据以序列化的形式缓存在jvm 的堆空间中
  • 在python 中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在jvm 堆空间中
  • 当我们把数据写到磁盘或者堆外存储上时,也总是使用序列化后的数据
  • 缓存的数据太多,内存中放不下,spark 会自动利用最近最少使用(lru)的缓存策略把最老的分区从内存中移除
  • unpersist() 可以手动把持久化的rdd 从缓存中移除

持久化级别

级  别 使用的
空间
cpu
时间
是否在
内存中
是否在
磁盘上
备注
memory_only
memory_only_ser
memory_and_disk 中等 部分 部分 如果数据在内存中放不下,则溢写到磁盘上
memory_and_disk_ser 部分 部分 如果数据在内存中放不下,则溢写到磁盘上。在内存中存放序列化后的数据
disk_only

p.s.

可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份



  1. 摘自

  2. fold() 和 reduce() 不同的是,需要再加上一个“初始值”来作为每个分区第一次调用时的结果;aggregate() 和 前两者不同的是,返回值类型可以和 rdd 的类型不一致