Spark的算子:方法、函数
程序员文章站
2022-05-09 23:31:27
1、什么是RDD? 最核心
*)弹性分布式数据集,Resilent distributed DataSet
(*)Spark中数据的基本抽象
(*)结...
1、什么是RDD? 最核心
*)弹性分布式数据集,Resilent distributed DataSet (*)Spark中数据的基本抽象 (*)结合源码,查看RDD的概念 * Internally, each RDD is characterized by five main properties: * * - A list of partitions 一组分区,把数据分成了的不同的分区,每个分区可能运行在不同的worker * - A function for computing each split 一个函数,用于计算每个分区中的数据 RDD的函数(算子) (1)Transformation(延时加载) (2)Action(会触发计算) * - A list of dependencies on other RDDs RDD之间存在依赖关系:(1)窄依赖 (2)宽依赖 根据依赖的关系,来划分任务的Stage(阶段) * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 如何创建一个RDD?有两种方式 (1)使用sc.parallelize方法 val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3) (2)通过使用外部的数据源创建RDD:比如:HDFS val rdd2 = sc.textFile("hdfs://bigdata11:9000/input/data.txt") val rdd2 = sc.textFile("/root/temp/input/data.txt")
RDD的分区
2、Transformation算子:不会触发计算、延时加载(lazy值) map(func):该操作是对原来的RDD进行操作后,返回一个新的RDD filter: 过滤操作、返回一个新的RDD flatMap:类似map mapPartitions:对每个分区进行操作 mapPartitionsWithIndex: 对每个分区进行操作,带分区的下标 union 并集 intersection 交集 distinct 去重 groupByKey: 都是按照Key进行分组 reduceByKey: 都是按照Key进行分组、会有一个本地操作(相当于:Combiner操作) 3、Action算子:会触发计算 collect: 触发计算、打印屏幕上。以数组形式返回 count: 求个数 first: 第一个元素(take(1)) take(n) saveAsTextFile: 会转成String的形式,会调用toString()方法 foreach: 在RDD的每个元素上进行某个操作
7、RDD算子的基础例子 1、创建一个RDD(数字) val rdd1 = sc.parallelize(List(5,6,1,2,10,4,12,20,100,30)) 每个元素*2,然后排序 val rdd2 = rdd1.map(_*2).sortBy(x=>x,true) 完整 val rdd2 = rdd1.map((x:Int)=>x*2) 过滤出大于10的元素 val rdd3 = rdd2.filter(_>10) rdd3.collect 2、创建一个RDD(字符) val rdd1 = sc.parallelize(Array("a b c","d e f","h i j")) val rdd2 = rdd1.flatMap(_.split(' ')) rdd2.collect 3、集合运算、去重 val rdd1 = sc.parallelize(List(5,6,7,8,1,2)) val rdd2 = sc.parallelize(List(1,2,3,4)) val rdd3 = rdd1.union(rdd2) rdd3.distinct.collect val rdd4 = rdd1.intersection(rdd2) 4、分组 val rdd1 = sc.parallelize(List(("Tom",1000),("Jerry",3000),("Mary",2000))) val rdd2 = sc.parallelize(List(("Jerry",500),("Tom",3000),("Mike",2000))) 并集 val rdd3 = rdd1 union rdd2 scala> val rdd4 = rdd3.groupByKey rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[27] at groupByKey at :30 scala> rdd4.collect res8: Array[(String, Iterable[Int])] = Array((Tom,CompactBuffer(1000, 3000)), (Jerry,CompactBuffer(3000, 500)), (Mike,CompactBuffer(2000)), (Mary,CompactBuffer(2000))) 按照Value排序 提示:交换了key-value的位置(两次) sortByKey: 按照key进行排序 val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
4、RDD的缓存机制:默认在内存中 (*)提高效率 (*)默认:缓存在Memory中 (*)调用:方法:persist或者cache def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) def cache(): this.type = persist() (*)缓存的位置:StorageLevel定义的 val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1) (*)示例: 测试数据:Oracle数据库的订单变 sales表(大概92万) 步骤 (1)读入数据 val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales") (2)计算 rdd1.count ---> Action,这一次没有缓存 rdd1.cache ---> 缓存数据,但是不会触发计算,cache是一个Transformation rdd1.count ----> 触发计算,将结果缓存 rdd1.count ----> ???会从哪里得到数据,从缓存中得到
5、RDD的容错机制:checkpoint检查点:两种类型 (1)本地目录 (2)HDFS目录 (1)复习检查点:HDFS中,合并元信息 Oracle中,会以最高优先级唤醒数据库写进程(DBWn),来写内存中的脏数据---> 数据文件 (2)RDD的检查点:容错机制,辅助Lineage(血统)---> 整个计算的过程 如果lineage越长,出错的概率就越大(生成检查点,如果出错,就从之前的检查点开始计算) 两种类型 (1)本地目录 : 需要将spark-shell运行在本地模式上 (2)HDFS目录: 需要将spark-shell运行在集群模式上 scala> sc.setCheckpointDir("hdfs://bigdata11:9000/spark/checkpoint") scala> val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales") rdd1: org.apache.spark.rdd.RDD[String] = hdfs://bigdata11:9000/input/sales MapPartitionsRDD[41] at textFile at :24 scala> rdd1.checkpoint scala> rdd1.count 源码中的说明: /** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with `SparkContext#setCheckpointDir` and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */
6、RDD的依赖关系、划分Spark任务的Stage(阶段) (*)窄依赖:每一个父RDD的分区最多被子RDD的一个分区使用 比方:独生子女 (*)宽依赖:多个子RDD的分区会依赖同一个父RDD的分区 比方:超生