学大数据一点也不难!一文带你了解RDD与共享变量(附安装教程)
?前言
Spark是一种大规模、快速计算的集群平台,本头条号试图通过学习Spark官网的实战演练笔记提升笔者实操能力以及展现Spark的精彩之处。有关框架介绍和环境配置可以参考以下内容:
linux下Hadoop安装与环境配置(附详细步骤和安装包下载)
linux下Spark安装与环境配置(附详细步骤和安装包下载)
本文的参考配置为:Deepin 15.11、Java 1.8.0_241、Hadoop 2.10.0、Spark 2.4.4、scala 2.11.12
一、弹性分布式数据集(RDDs)
Spark 主要以弹性分布式数据集(RDD)的概念为中心,它是一个容错且可以执行并行操作的元素的集合。有两种方法可以创建 RDD:在你的 driver program(驱动程序)中 parallelizing 一个已存在的集合,或者在外部存储系统中引用一个数据集,例如,一个共享文件系统,HDFS,HBase,或者提供 Hadoop InputFormat 的任何数据源。
1.并行集合
例如在已存在的集合上通过调用 SparkContext 的 parallelize 方法来创建并行集合。
2.外部数据源
Spark 可以从 Hadoop 所支持的任何存储源中创建 distributed dataset(分布式数据集),包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3 等等。Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat。
3.RDD操作
RDDs support 两种类型的操作:transformations(转换),它会在一个已存在的 dataset 上创建一个新的 dataset,和 actions(动作),将在 dataset 上运行的计算后返回到 driver 程序。
我们还可以传递函数给Spark。当 driver 程序在集群上运行时,Spark 的 API 在很大程度上依赖于传递函数。可以使用Anonymous function syntax(匿名函数语法),它可以用于短的代码片断.
大多数 Spark 操作工作在包含任何类型对象的 RDDs 上,只有少数特殊的操作可用于 Key-Value 对的 RDDs。最常见的是分布式 “shuffle” 操作,如通过元素的 key 来进行 grouping 或 aggregating 操作.
3.RDD转换操作map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD;Filter是对RDD元素进行过滤;返回一个新的数据集,是经过func函数后返回值为true的原元素组成flatMap类似于map,但是每一个输入元素,会被映射为0到多个输出元素mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是每个分区的数据,也就是把每个分区中的内容作为整体来处理的mapPartitionsWithSplit与mapPartitions的功能类似, 只是多传入split index而已,所有func 函数必需是 (Int, Iterator )=> Iterator 类型sample(withReplacement,fraction,seed)是根据给定的随机种子seed,随机抽样出数量为frac的数据。withReplacement:是否放回抽样;fraction:比例,0.1表示10% ;union(otherDataset)是数据合并,返回一个新的数据集,由原数据集和otherDataset联合而成intersection(otherDataset)是数据交集,返回一个新的数据集,包含两个数据集的交集数据;distinct([numTasks]))是数据去重,返回一个数据集,是对两个数据集去除重复数据,numTasks参数是设置任务并行数量groupByKey([numTasks])是数据分组操作,在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。reduceByKey(func, [numTasks])是数据分组聚合操作,在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U)=>U) 和reduceByKey的不同在于,reduceByKey输入输出都是(K, V),而aggreateByKey输出是(K,U),aggreateByKey可以看成更高抽象的,更灵活的reduce或groupcombineByKey是对RDD中的数据集按照Key进行聚合操作。sortByKey([ascending],[numTasks])是排序操作,对(K,V)类型的数据按照K进行排序,其中K需要实现Ordered方法join(otherDataset, [numTasks])是连接操作,将输入数据集(K,V)和另外一个数据集(K,W)进行Join, 得到(K, (V,W));cogroup(otherDataset, [numTasks])是将输入数据集(K, V)和另外一个数据集(K, W)进行cogroup,得到一个格式为(K, Seq[V], Seq[W])的数据集cartesian(otherDataset)是做笛卡尔积:对于数据集T和U 进行笛卡尔积操作, 得到(T, U)格式的数据集 4.RDD行动操作reduce(func)是对数据集的所有元素执行聚集(func)函数,该函数必须是可交换的。collect是将数据集中的所有元素以一个array的形式返回count返回数据集中元素的个数。first返回数据集中的第一个元素, 类似于take(1)Take(n)返回一个包含数据集中前n个元素的数组takeSample(withReplacement,num, [seed])返回包含随机的num个元素的数组takeOrdered(n, [ordering])是返回包含随机的n个元素的数组,按照顺序输出saveAsTextFile把数据集中的元素写到一个文本文件,Spark会对每个元素调用toString方法来把每个元素存成文本文件的一行。countByKey对于(K, V)类型的RDD. 返回一个(K, Int)的map, Int为K的个数foreach(func)是对数据集中的每个元素都执行func函数 5.持久化
Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操作间都可以访问这些持久化的数据。RDD 可以使用 persist() 方法或 cache() 方法进行持久化。
二、共享变量
通常情况下,一个传递给 Spark 操作(例如 map 或 reduce)的函数 func 是在远程的手游拍卖集群节点上执行的。该函数 func 在多个节点执行过程中使用的变量,是同一个变量的多个副本。这些变量的以副本的方式拷贝到每个机器上,并且各个远程机器上变量的更新并不会传播回 driver program(驱动程序)。通用且支持 read-write(读-写)的共享变量在任务间是不能胜任的。所以,Spark 提供了两种特定类型的共享变量:broadcast variables(广播变量)和 accumulators(累加器)。
1.广播变量
Broadcast variables(广播变量)允许程序员将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。广播变量通过在一个变量 v 上调用 SparkContext.broadcast(v) 方法来进行创建。广播变量是 v 的一个 wrapper(包装器),可以通过调用 value 方法来访问它的值。
在创建广播变量之后,在集群上执行的所有的函数中,应该使用该广播变量代替原来的 v 值,所以节点上的 v 最多分发一次。另外,对象 v 在广播后不应该再被修改,以保证分发到所有的节点上的广播变量具有同样的值(例如,如果以后该变量会被运到一个新的节点)。
2.累加器
Accumulators(累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter(计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且程序员可以添加新的支持类型。
可以通过调用 sc.longAccumulator() 或sc.doubleAccumulator() 方法创建数值类型的 accumulator(累加器)以分别累加 Long 或 Double 类型的值。集群上正在运行的任务就可以使用 add 方法来累计数值。然而,它们不能够读取它的值。只有 driver program(驱动程序)才可以使用 value 方法读取累加器的值。