SparkCore知识汇总
SparkCore
RDD 概述
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
RDD 的属性
- 一组分区(Partition),即数据集的基本组成单位;
- 一个计算每个分区的函数;
- RDD 之间的依赖关系;
- 一个 Partitioner,即 RDD 的分片函数;
- 一个列表,存储存取每个 Partition 的优先位置(preferred location)
RDD 特点
RDD 表示只读的分区的数据集,对 RDD 进行改动,只能通过 RDD 的转换操作,由一个RDD 得到一个新的 RDD,新的 RDD 包含了从其他 RDD 衍生所必需的信息。RDDs 之间存在依赖,RDD 的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD 来切断血缘关系。
弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
分区
RDD 逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个 compute函数得到每个分区的数据。如果 RDD 是通过已有的文件系统构建,则 compute 函数是读取指定文件系统中的数据,如果 RDD 是通过其他 RDD 转换而来,则 compute 函数是执行转换逻辑将其他 RDD 的数据进行转换
只读
-
RDD 是只读的,要想改变 RDD 中的数据,只能在现有的 RDD 基础上创建新的 RDD。
-
由一个 RDD 转换到另一个 RDD,可以通过丰富的操作算子实现,不再像 MapReduce那样只能写 map 和 reduce 了。
-
RDD 的操作算子包括两类,一类叫做 transformations,它是用来将 RDD 进行转化,构建 RDD 的血缘关系;另一类叫做 actions,它是用来触发 RDD 的计算,得到 RDD 的相关计算结果或者将 RDD 保存的文件系统中。下图是 RDD 所支持的操作算子列表
依赖
- RDDs 通过操作算子进行转换,转换得到的新 RDD 包含了从其他 RDDs 衍生所必需的信息,RDDs 之间维护着这种血缘关系,也称之为依赖。
如下图所示,依赖包括两种:- 一种是窄依赖,RDDs 之间分区是一一对应的。
- 另一种是宽依赖,下游 RDD 的每个分区与上游RDD(也称之为父 RDD)的每个分区都有关,是多对多的关系。
缓存
一个常用的RDD可以通过cache到内存中,增加访问效率,如果在后续还有需要这个RDD的时候,会直接从缓存中取,而不会根据亲缘关系再计算一遍。
CheckPoint
RDD 支持 checkpoint 将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为 checkpoint后的RDD 不需要知道它的父RDDs 了,它可以从 checkpoint处拿到数据。
RDD 编程
RDD的创建
三种:
从集合中创建 RDD:
- 使用 parallelize()从集合创建
- makeRDD()从集合创建
从外部存储创建 RDD:
- sc.textFile(“hdfs://hadoop102:9000/RELEASE”)
从其他 RDD 创建:
- RDD的转换操作
RDD 的转化
Value 类型
- map(func): 输入一个,输出一个
- mapPartitions(func): 独立运行在每个分片上的RDD的map操作
- mapPartitionsWithIndex(func): 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值
- flatMap(func): 每一个输入元素可以被映射为 0 或多个输出元素,有点像hive的explode。
- glom: 将每一个分区形成一个数组,形成新的 RDD 类型时 RDD[Array[T]]
- groupBy(func): 分组,按照传入函数的返回值进行分组。将相同的 key 对应的值放入一个迭代器
- filter(func): 过滤。返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成。
- sample(withReplacement, fraction, seed): 以指定的随机种子随机抽样出数量为 fraction的数据,withReplacement 表示是抽出的数据是否放回,true 为有放回的抽样,false 为无放回的抽样,seed 用于指定随机数生成器种子。
- distinct([numTasks])): 对源 RDD 进行去重后返回一个新的 RDD。默认情况下,只有 8 个并行任务来操作,但是可以传入一个可选的 numTasks 参数改变它。
- coalesce(numPartitions): 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
- repartition(numPartitions): 根据分区数,重新通过网络随机洗牌所有数据。
- sortBy(func,[ascending], [numTasks]): 使用 func 先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
- pipe(command, [envVars]): 管道,针对每个分区,都执行一个 shell 脚本,返回输出的 RDD
map()和 mapPartition()的区别:
- map():每次处理一条数据。
- mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原 RDD 中分区的数据才能释放,可能导致 OOM。
- 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。
coalesce 和 repartition 的区别:
- coalesce 重新分区,可以选择是否进行 shuffle 过程。由参数 shuffle: Boolean =false/true决定。
- repartition 实际上是调用的 coalesce,进行 shuffle。源码如下
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
双 Value 类型交互
- union(otherDataset): 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
- subtract (otherDataset): 计算差的一种函数,去除两个 RDD 中相同的元素,不同的 RDD 将保留下来。
- intersection(otherDataset): 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
- cartesian(otherDataset): 笛卡尔积
- zip(otherDataset): 将两个 RDD 组合成 Key/Value 形式的 RDD,这里默认两个 RDD 的 partition 数量以及元素数量都相同,否则会抛出异常。
Key-Value 类型
-
**partitionBy:**对 pairRDD 进行分区操作,如果原有的 partionRDD 和现有的 partionRDD 是一致的话就不进行分区, 否则会生成 ShuffleRDD,即会产生 shuffle 过程。
-
reduceByKey(func, [numTasks]): 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同key 的值聚合到一起,reduce 任务的个数可以通过第二个可选的参数来设置。
-
groupByKey: groupByKey 也是对每个 key 进行操作,但只生成一个 seq
-
aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U):
-
在 kv 对的 RDD 中,按 key 将 value 进行分组合并,合并时,将每个 value 和初始值作为 seq 函数的参数,进行计算,返回的结果作为一个新的 kv 对。
-
然后再将结果按照key 进行合并,最后将每个分组的 value 传递给 combine 函数进行计算
-
(先将前两个value进行计算,将返回结果和下一个 value 传给 combine 函数,以此类推),将 key 与计算结果作为一个新的 kv 对输出。
-
(1)zeroValue:给每一个分区中的每一个 key 一个初始值;
-
(2)seqOp:函数用于在每一个分区中用初始值逐步迭代 value;
-
(3)combOp:函数用于合并每个分区中的结果。
-
-
foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]: aggregateByKey 的简化操作,seqop 和 combop 相同
-
combineByKey[C]: 针对相同 K,将 V 合并成一个集合。
- 2.参数描述:
(1)createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
(2)mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
(3)mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
- 2.参数描述:
- sortByKey([ascending], [numTasks]): 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD
- mapValues: 针对于(K,V)形式的类型只对 V 进行操作
- join(otherDataset, [numTasks]): 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDD。
- cogroup(otherDataset, [numTasks]): 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD。
reduceByKey 和 groupByKey 的区别
- reduceByKey:按照 key 进行聚合,在 shuffle 之前有 combine(预聚合)操作,返回结果是 RDD[k,v]。
- groupByKey:按照 key 进行分组,直接进行 shuffle。
- 开发指导:reduceByKey 比 groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
Action
-
reduce(func): 通过 func 函数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
-
collect(): 在驱动程序中,以数组的形式返回数据集的所有元素。
-
count(): 返回 RDD 中元素的个数。
-
first(): 返回 RDD 中的第一个元素。
-
take(n): 返回一个由 RDD 的前 n 个元素组成的数组。
-
takeOrdered(n): 返回该 RDD 排序后的前 n 个元素组成的数组。
-
aggregate: :
- 参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
- 作用:aggregate 函数将每个分区里面的元素通过 seqOp 和初始值进行聚合,然后用combine 函数将每个分区的结果和初始值(zeroValue)进行 combine 操作。这个函数最终返回的类型不需要和 RDD 中元素类型一致。
-
fold(num)(func): 折叠操作,aggregate 的简化操作,seqop 和 combop 一样。
-
saveAsTextFile(path): 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本
-
saveAsSequenceFile(path): 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使HDFS 或者其他 Hadoop 支持的文件系统。
-
saveAsObjectFile(path): 用于将 RDD 中的元素序列化成对象,存储到文件中。
-
countByKey(): 针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数。
-
foreach(func): 在数据集的每一个元素上,运行函数 func 进行更新。
RDD 中的函数传递
在实际开发中我们往往需要自己定义一些对于 RDD 的操作,那么此时需要主要的是,初始化工作是在 Driver 端进行的,而实际运行程序是在 Executor 端进行的,这就涉及到了跨进程通信,是需要序列化的。下面我们看几个例子:
传递一个方法:
- 报错:: java.io.NotSerializableException: com.atguigu.Search
- 原因:在这个方法中所调用的方法 isMatch()是定义在 Search 这个类中的,实际上调用的是this. isMatch(),this 表示 Search 这个类的对象,程序在运行过程中需要将 Search 对象序列化以后传递到 Executor 端。
- 解决办法:使类继承 scala.Serializable 即可。
传递一个属性:
- 报错:java.io.NotSerializableException: com.atguigu.Search
- 原因:在这个方法中所调用的方法 query 是定义在 Search 这个类中的字段,实际上调用的是this. query,this 表示 Search 这个类的对象,程序在运行过程中需要将 Search 对象序列化以后传递到 Executor 端。
- 解决办法:
- 1)使类继承 scala.Serializable 即可
- 2)将类变量 query 赋值给局部变量
RDD 依赖关系
Lineage:
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据
信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
RDD 和它依赖的父 RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
窄依赖
窄依赖指的是每一个父 RDD 的 Partition 最多被子 RDD 的一个 Partition 使用。
窄依赖我们形象的比喻为独生子女
宽依赖
宽依赖指的是多个子 RDD 的 Partition 会依赖同一个父 RDD 的 Partition,会引起shuffle。
宽依赖我们形象的比喻为超生
DAG
- DAG(Directed Acyclic Graph)叫做有向无环图,原始的 RDD 通过一系列的转换就就形成了 DAG。
- 根据 RDD 之间的依赖关系的不同将 DAG 划分成不同的 Stage,对于窄依赖,partition 的转换处理在 Stage 中完成计算。
- 对于宽依赖,由于有 Shuffle 的存在,只能在 parent RDD 处理完成后,才能开始接下来的计算,因此宽依赖是划分 Stage 的依据。
任务划分
RDD 任务切分中间分为:Application、Job、Stage 和 Task
1)Application:初始化一个 SparkContext 即生成一个 Application
2)Job:一个 Action 算子就会生成一个 Job
3)Stage:根据 RDD 之间的依赖关系的不同将 Job 划分成不同的 Stage,遇到一个宽依赖则划分一个 Stage。
4)Task:Stage 是一个 TaskSet,将 Stage 划分的结果发送到不同的 Executor 执行即为一个Task。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
RDD 缓存
- RDD 通过 persist 方法或 cache 方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。
- 但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
-储存级别:
RDD CheckPoint
checkPoint从做检查点的 RDD 开始重做 Lineage,就会减少开销。检查点通过将数据写入到 HDFS 文件系统实现了 RDD 的检查点功能
为当前 RDD 设置检查点。该函数将会创建一个二进制的文件,并存储到 checkpoint 目录中,该目录是用 SparkContext.setCheckpointDir()设置的。在 checkpoint 的过程中,该RDD 的所有依赖于父 RDD 中的信息将全部被移除。对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
键值对 RDD 数据分区
Spark 目前支持 Hash 分区和 Range 分区,用户也可以自定义分区,Hash 分区为当前的默认分区,Spark 中分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过Shuffle 过程属于哪个分区和 Reduce 的个数。
注意:
(1)只有 Key-Value 类型的 RDD 才有分区的,非 Key-Value 类型的 RDD 分区的值是 None
(2)每个 RDD 的分区 ID 范围:0~numPartitions-1,决定这个值是属于那个分区的。
获取 RDD 分区
可以通过使用 RDD 的 partitioner 属性来获取 RDD 的分区方式。它会返回一个scala.Option 对象, 通过 get 方法获取其中的值。
Hash 分区
HashPartitioner 分区的原理:对于给定的 key,计算其 hashCode,并除以分区的个数取余,如果余数小于 0,则用余数+分区的个数(否则加 0),最后返回的值就是这个 key 所属的分区 ID。
Ranger 分区
HashPartitioner 分区弊端: 可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有 RDD 的全部数据。
RangePartitioner 作用: 将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
- 第一步:先重整个 RDD 中抽取出样本数据,将样本数据排序,计算出每个分区的最大 key 值,形成一个 Array[KEY]类型的数组变量 rangeBounds;
- 第二步:判断 key 在 rangeBounds 中所处的范围,给出该 key 值在下一个 RDD 中的分区 id 下标;该分区器要求 RDD 中的 KEY 类型必须是可以排序的
自定义分区
要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。
(1)numPartitions: Int:返回创建出来的分区数。
(2)getPartition(key: Any): Int:返回给定键的分区编号(0 到 numPartitions-1)。 (3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个RDD 的分区方式是否相同。
需求:将相同后缀的数据写入相同的文件,通过将相同后缀的数据分区到相同的分区并保存输出来实现
数据读取与保存
Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:Text 文件、Json 文件、Csv 文件、Sequence 文件以及 Object 文件;
文件系统分为:本地文件系统、HDFS、HBASE 以及数据库。
Text 文件:
1)数据读取:textFile(String)
2)数据保存: saveAsTextFile(String)
Json 文件
- 如果 JSON 文件中每一行就是一个 JSON 记录,那么可以通过将 JSON 文件当做文本文件来读取,然后利用相关的 JSON 库对每一条数据进行 JSON 解析。
- 注意:使用 RDD 读取 JSON 文件处理很复杂,同时 SparkSQL 集成了很好的处理JSON 文件的方式,所以应用中多是采用 SparkSQL 处理 JSON 文件。
Sequence 文件
SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile[ keyClass, valueClass](path)
SequenceFile 文件只针对 PairRDD
对象文件
对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用 saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。
文件系统类数据读取与保存
HDFS
-
Spark 的整个生态系统与 Hadoop 是完全兼容的,所以对于 Hadoop 所支持的文件类型或者数据库类型,Spark 也同样支持
-
另外,由于 Hadoop 的 API 有新旧两个版本,所以 Spark 为了能够兼容 Hadoop 所有的版本,也提供了两套创建操作接口.
-
对于外部存储创建操作而言,hadoopRDD 和 newHadoopRDD 是最为抽象的两个函数接口,主要包含以下四个参数.
-1)输入格式(InputFormat): 制定数据输入的类型,如 TextInputFormat 等,新旧两个版本所引用的版本分别是:
org.apache.hadoop.mapred.InputFormat 和
org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)- 2)键类型: 指定[K,V]键值对中 K 的类型
- 3)值类型: 指定[K,V]键值对中 V 的类型
- 4)分区值: 指定由外部存储生成的 RDD 的 partition 数量的最小值,如果没有指定,系统会使用默认值 defaultMinSplits
-
其他创建操作的 API 接口都是为了方便最终的 Spark 程序开发者而设置的,是这两个接口的高效实现版本.
-
例如,对于 textFile 而言,只有 path 这个指定文件路径的参数,其他参数在系统内部指定了默认值。
此外:
1.在 Hadoop 中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop 本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
2.如果用 Spark 从 Hadoop 中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用 map-reduce 的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD 和 newAPIHadoopRDD 两个类就行了
MySQL 数据库连接
(1)添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
Mysql 读取
package com.atguigu
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object MysqlRDD {
def main(args: Array[String]): Unit = {
//1.创建 spark 配置信息
val sparkConf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//2.创建 SparkContext
val sc = new SparkContext(sparkConf)
//3.定义连接 mysql 的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/rdd"
val userName = "root"
val passWd = "000000"
//创建 JdbcRDD
val rdd = new JdbcRDD(sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select * from `rddtable` where `id`>=?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2))
)
//打印最后结果
println(rdd.count())
rdd.foreach(println)
sc.stop()
}
}
Mysql 写入:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
val data = sc.parallelize(List("Female", "Male","Female"))
data.foreachPartition(insertData)
}
def insertData(iterator: Iterator[String]): Unit = {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://master01:3306/rdd", "root",
"hive")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
ps.setString(1, data)
ps.executeUpdate()
})
}
HBase 数据库
Spark 可以通过Hadoop 输入格式访问 HBase。
输入格式会返回键值对数据:
其中键的类型为 org. apache.hadoop.hbase.io.ImmutableBytesWritable。
而值的类型为org.apache.hadoop.hbase.client.Result。
(1)添加依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
(2)
//构建sparkConf
var sparkConf = new SparkConf.setMaster("Local[*]").setAppName("HbaseRDD")
//得到sparkContext
var sparkContext = new SparkContext(sparkConf)
//因为要连接Hbase,因此需要对Hbase进行操作,需要Hbase的配置
val conf:Configuration = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104")
conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
//从Hbase读取数据得到RDD
val hbaseRDD =
sc.newAipHadoopRDD(conf, //配置文件
classOf[TableInputFormat], // RDD类型
classOf[ImmutableBytesWritable], //RDD:key
classOf[Result]) //rdd :value
hbaseRDD.foreach{
case(_,result)=>{
val key:String = Bytes.toString(result.getRow)
val name: String =
Bytes.toString(
result.getValue(Bytes.toBytes("info"),
Bytes.toBytes("name")))
val color: String =
Bytes.toString(
result.getValue(Bytes.toBytes("info"),
Bytes.toBytes("color")))
println("RowKey:" + key + ",Name:" + name + ",Color:" + color)
}
sparkContext.stop()
}
RDD 编程进阶
累加器
系统累加器:
- var accumulator = sc.accumulator(0)
自定义累加器
package cc11001100.spark.sharedVariables.accumulators;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.AccumulatorV2;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
/**
* 自定义累加器
*
* @author
*/
public class CustomAccumulatorDemo {
// 需要注意的是累加操作不能依赖顺序,比如类似于StringAccumulator这种则会得到错误的结果
public static class BigIntegerAccumulator extends AccumulatorV2<BigInteger, BigInteger> {
private BigInteger num = BigInteger.ZERO;
public BigIntegerAccumulator() {
}
public BigIntegerAccumulator(BigInteger num) {
this.num = new BigInteger(num.toString());
}
@Override
public boolean isZero() {
return num.compareTo(BigInteger.ZERO) == 0;
}
@Override
public AccumulatorV2<BigInteger, BigInteger> copy() {
return new BigIntegerAccumulator(num);
}
@Override
public void reset() {
num = BigInteger.ZERO;
}
@Override
public void add(BigInteger num) {
this.num = this.num.add(num);
}
@Override
public void merge(AccumulatorV2<BigInteger, BigInteger> other) {
num = num.add(other.value());
}
@Override
public BigInteger value() {
return num;
}
}
public static void main(String[] args) {
SparkSession spark =
SparkSession.builder().master("local[*]").getOrCreate();
SparkContext sc = spark.sparkContext();
// 直接new累加器
BigIntegerAccumulator bigIntegerAccumulator = new BigIntegerAccumulator();
// 然后在SparkContext上注册一下
sc.register(bigIntegerAccumulator, "bigIntegerAccumulator");
List<BigInteger> numList = Arrays.asList(new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"));
Dataset<BigInteger> num = spark.createDataset(numList, Encoders.kryo(BigInteger.class));
Dataset<BigInteger> num2 = num.map((MapFunction<BigInteger, BigInteger>) x -> {
bigIntegerAccumulator.add(x);
return x;
}, Encoders.kryo(BigInteger.class));
num2.count();
System.out.println("bigIntegerAccumulator: " + bigIntegerAccumulator.value());
}
}
广播变量
- 广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。
- 比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。
- 在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)
scala> broadcastVar.value
res33: Array[Int] = Array(1, 2, 3)
使用广播变量的过程如下:
(1) 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T]对象。任何可序列化的类型都可以这么实现。
(2) 通过 value 属性访问该对象的值(在 Java 中为 value()方法)。
(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。