欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

学大数据一点也不难!一文带你了解RDD与共享变量(附安装教程)

程序员文章站 2022-03-27 09:45:03
...

  ?前言

  Spark是一种大规模、快速计算的集群平台,本头条号试图通过学习Spark官网的实战演练笔记提升笔者实操能力以及展现Spark的精彩之处。有关框架介绍和环境配置可以参考以下内容:

  linux下Hadoop安装与环境配置(附详细步骤和安装包下载)

  linux下Spark安装与环境配置(附详细步骤和安装包下载)

  本文的参考配置为:Deepin 15.11、Java 1.8.0_241、Hadoop 2.10.0、Spark 2.4.4、scala 2.11.12

  一、弹性分布式数据集(RDDs)

  Spark 主要以弹性分布式数据集(RDD)的概念为中心,它是一个容错且可以执行并行操作的元素的集合。有两种方法可以创建 RDD:在你的 driver program(驱动程序)中 parallelizing 一个已存在的集合,或者在外部存储系统中引用一个数据集,例如,一个共享文件系统,HDFS,HBase,或者提供 Hadoop InputFormat 的任何数据源。

  1.并行集合

  例如在已存在的集合上通过调用 SparkContext 的 parallelize 方法来创建并行集合。

  2.外部数据源

  Spark 可以从 Hadoop 所支持的任何存储源中创建 distributed dataset(分布式数据集),包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3 等等。Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat。

  3.RDD操作

  RDDs support 两种类型的操作:transformations(转换),它会在一个已存在的 dataset 上创建一个新的 dataset,和 actions(动作),将在 dataset 上运行的计算后返回到 driver 程序。

  我们还可以传递函数给Spark。当 driver 程序在集群上运行时,Spark 的 API 在很大程度上依赖于传递函数。可以使用Anonymous function syntax(匿名函数语法),它可以用于短的代码片断.

  大多数 Spark 操作工作在包含任何类型对象的 RDDs 上,只有少数特殊的操作可用于 Key-Value 对的 RDDs。最常见的是分布式 “shuffle” 操作,如通过元素的 key 来进行 grouping 或 aggregating 操作.

  3.RDD转换操作map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD;Filter是对RDD元素进行过滤;返回一个新的数据集,是经过func函数后返回值为true的原元素组成flatMap类似于map,但是每一个输入元素,会被映射为0到多个输出元素mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是每个分区的数据,也就是把每个分区中的内容作为整体来处理的mapPartitionsWithSplit与mapPartitions的功能类似, 只是多传入split index而已,所有func 函数必需是 (Int, Iterator        )=> Iterator 类型sample(withReplacement,fraction,seed)是根据给定的随机种子seed,随机抽样出数量为frac的数据。withReplacement:是否放回抽样;fraction:比例,0.1表示10% ;union(otherDataset)是数据合并,返回一个新的数据集,由原数据集和otherDataset联合而成intersection(otherDataset)是数据交集,返回一个新的数据集,包含两个数据集的交集数据;distinct([numTasks]))是数据去重,返回一个数据集,是对两个数据集去除重复数据,numTasks参数是设置任务并行数量groupByKey([numTasks])是数据分组操作,在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。reduceByKey(func, [numTasks])是数据分组聚合操作,在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U)=>U) 和reduceByKey的不同在于,reduceByKey输入输出都是(K, V),而aggreateByKey输出是(K,U),aggreateByKey可以看成更高抽象的,更灵活的reduce或groupcombineByKey是对RDD中的数据集按照Key进行聚合操作。sortByKey([ascending],[numTasks])是排序操作,对(K,V)类型的数据按照K进行排序,其中K需要实现Ordered方法join(otherDataset, [numTasks])是连接操作,将输入数据集(K,V)和另外一个数据集(K,W)进行Join, 得到(K, (V,W));cogroup(otherDataset, [numTasks])是将输入数据集(K, V)和另外一个数据集(K, W)进行cogroup,得到一个格式为(K, Seq[V], Seq[W])的数据集cartesian(otherDataset)是做笛卡尔积:对于数据集T和U 进行笛卡尔积操作, 得到(T, U)格式的数据集   4.RDD行动操作reduce(func)是对数据集的所有元素执行聚集(func)函数,该函数必须是可交换的。collect是将数据集中的所有元素以一个array的形式返回count返回数据集中元素的个数。first返回数据集中的第一个元素, 类似于take(1)Take(n)返回一个包含数据集中前n个元素的数组takeSample(withReplacement,num, [seed])返回包含随机的num个元素的数组takeOrdered(n, [ordering])是返回包含随机的n个元素的数组,按照顺序输出saveAsTextFile把数据集中的元素写到一个文本文件,Spark会对每个元素调用toString方法来把每个元素存成文本文件的一行。countByKey对于(K, V)类型的RDD. 返回一个(K, Int)的map, Int为K的个数foreach(func)是对数据集中的每个元素都执行func函数   5.持久化

  Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操作间都可以访问这些持久化的数据。RDD 可以使用 persist() 方法或 cache() 方法进行持久化。

  二、共享变量

  通常情况下,一个传递给 Spark 操作(例如 map 或 reduce)的函数 func 是在远程的手游拍卖集群节点上执行的。该函数 func 在多个节点执行过程中使用的变量,是同一个变量的多个副本。这些变量的以副本的方式拷贝到每个机器上,并且各个远程机器上变量的更新并不会传播回 driver program(驱动程序)。通用且支持 read-write(读-写)的共享变量在任务间是不能胜任的。所以,Spark 提供了两种特定类型的共享变量:broadcast variables(广播变量)和 accumulators(累加器)。

  1.广播变量

  Broadcast variables(广播变量)允许程序员将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。广播变量通过在一个变量 v 上调用 SparkContext.broadcast(v) 方法来进行创建。广播变量是 v 的一个 wrapper(包装器),可以通过调用 value 方法来访问它的值。

  在创建广播变量之后,在集群上执行的所有的函数中,应该使用该广播变量代替原来的 v 值,所以节点上的 v 最多分发一次。另外,对象 v 在广播后不应该再被修改,以保证分发到所有的节点上的广播变量具有同样的值(例如,如果以后该变量会被运到一个新的节点)。

  2.累加器

  Accumulators(累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter(计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且程序员可以添加新的支持类型。

  可以通过调用 sc.longAccumulator() 或sc.doubleAccumulator() 方法创建数值类型的 accumulator(累加器)以分别累加 Long 或 Double 类型的值。集群上正在运行的任务就可以使用 add 方法来累计数值。然而,它们不能够读取它的值。只有 driver program(驱动程序)才可以使用 value 方法读取累加器的值。