欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark_RDD

程序员文章站 2024-01-30 15:22:22
...

RDD

弹性分布式数据集,就像Numpy array和Pandas Series,可以看作是一个有序的item集合,只不过这些item被分隔为多个partitions,分布在不同的机器上,

1.请简述RDD的含义,并写出针对RDD的两类操作(transformation与action),每类下至少三种的操作。

RDD(Resilient Distributed Datasets),弹性分布式数据集是一个容错的、可以被并行操作的元素集合弹性分布数据集。是Spark的核心,也是整个Spark的架构基础。Spark是以RDD概念为中心运行的。

RDD的一大特性是分布式存储,分布式存储在最大的好处是可以让数据在不同工作节点并行存储,以便在需要数据时并行运算。弹性指其在节点存储时,既可以使用内存,也可已使用外存,为使用者进行大数据处理提供方便

**它的特性可以总结如下:**

它是不变的数据结构存储
只读特性,维护DAG以便通过重新计算获得容错性
它是支持跨集群的分布式数据结构
可以根据数据记录的key对结构进行分区
提供了粗粒度的操作,且这些操作都支持分区
它将数据存储在内存中,从而提供了低延迟性


**常用的transformation操作:**

	map(func) 对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集
	filter(func) 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD
	flatMap(func) 和map差不多,但是flatMap生成的是多个结果,返回值是一个Seq(一个List)
	sample(withReplacement, fraction, seed) 从RDD中的item中采样一部分出来,有放回或者无放回
	union(otherDataset) 返回一个新的dataset,包含源dataset和给定dataset的元素的集合
	distinct([numTasks])) 对RDD中的item去重
	groupByKey([numTasks]): 返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist
	reduceByKey(func, [numTasks]): 就是用一个给定的reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数
	sortByKey([ascending], [numTasks]) 按照key来进行排序,是升序还是降序,ascending是boolean类型
	join(otherDataset, [numTasks]) 当有两个KV的dataset(K,V)(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数
	cartesian(otherDataset) 笛卡尔积就是m*n
	intersection(otherDataset): 交集
	substract(otherDataset): 差集
	sortBy(keyfunc, ascending=True, numPartitions=None): Sorts this RDD by the given keyfunc

**常用的action操作:**

	reduce(func): 对RDD中的items做聚合
	collect(): 计算所有的items并返回所有的结果到driver端,接着 collect()会以Python list的形式返回结果
	count(): 返回的是dataset中的element的个数
	first(): 和上面是类似的,不过只返回第1个item
	take(n): 类似,但是返回n个item
	top(n): 返回头n个items,按照自然结果排序
	countByKey(): 返回的是key对应的个数的一个map,作用于一个RDD
	foreach(): 对dataset中的每个元素都使用func
	takeSample(): 指定采样个数,返回相应的数目
	saveAsTextFile(path): 把dataset写到一个text file中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中

特别注意:Spark的核心概念是惰性计算,当你把一个RDD转换成另一个时,这些转换不会立即生效,action就是唤醒的时候

2.初始化RDD

1)初始方法1

如果本地已经有一份序列化的数据如python的list,你可以通过sc.parallelize去初始化一个RDD,当执行完这个操作之后,list中的元素会被自动分块,并把每一块送到集群上的不同机器上。

import pyspark
from pyspark import SparkConf,SparkContext
conf = SparkConf.setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

my_list = [1,2,3,4,5]
rdd = sc.parallelize(my_list)
rdd

#获取分片数
rdd.getNumPartitions()  #结果为本地笔记本cpu的核数
#查看分区情况。数据量太大的时候千万不能用collect,他会把rdd上的所有的数据取回本地,然后以列表的形式展示
rdd.glom().collect()  #结果【[1],[2],[3],[4,5]】

使用sc.parallelize,你可以把Python list,Numpy array 或者Pandas Series,DataFrame 转换为Spark RDD

2)初始方法2

直接把文件读到RDD,此时文件中的每一行都会被当作一个item,不过需要注意的是saprk默认读取的文件路径是HDFS的,所以如果读取本地文件时,要加入**file://**开头的全局路径

import pyspark
from pyspark import SparkConf,SparkContext
conf = SparkConf.setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
#显示文件的前5行
!head -5 ./name/yob1880.txt
#获取当前路径
import os
cwd = os.getcwd()
#读取本地文件
rdd = sc.textFile("file://"+cwd+"/name/yob1880.txt")
rdd.first()

也可以直接读取整个文件夹的所有文件,但这种读法,RDD中的每个item实际上是一个形如(文件名,文件所有内容)的元组,如下例

#读取整个文件家中的所有内容
rdd = sc.wholeTexeFiles("file://"+cwd+"/name")
3)其他的初始化方式
  1. HDFS上的文件
  2. Hive中的数据库与表
  3. Spark Sql 的到的结果

推荐阅读