Spark_RDD
程序员文章站
2024-01-30 15:22:22
...
RDD
弹性分布式数据集,就像Numpy array和Pandas Series,可以看作是一个有序的item集合,只不过这些item被分隔为多个partitions,分布在不同的机器上,
1.请简述RDD的含义,并写出针对RDD的两类操作(transformation与action),每类下至少三种的操作。
RDD(Resilient Distributed Datasets),弹性分布式数据集是一个容错的、可以被并行操作的元素集合弹性分布数据集。是Spark的核心,也是整个Spark的架构基础。Spark是以RDD概念为中心运行的。
RDD的一大特性是分布式存储,分布式存储在最大的好处是可以让数据在不同工作节点并行存储,以便在需要数据时并行运算。弹性指其在节点存储时,既可以使用内存,也可已使用外存,为使用者进行大数据处理提供方便
**它的特性可以总结如下:**
它是不变的数据结构存储
只读特性,维护DAG以便通过重新计算获得容错性
它是支持跨集群的分布式数据结构
可以根据数据记录的key对结构进行分区
提供了粗粒度的操作,且这些操作都支持分区
它将数据存储在内存中,从而提供了低延迟性
**常用的transformation操作:**
map(func) 对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集
filter(func) 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD
flatMap(func) 和map差不多,但是flatMap生成的是多个结果,返回值是一个Seq(一个List)
sample(withReplacement, fraction, seed) 从RDD中的item中采样一部分出来,有放回或者无放回
union(otherDataset) 返回一个新的dataset,包含源dataset和给定dataset的元素的集合
distinct([numTasks])) 对RDD中的item去重
groupByKey([numTasks]): 返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist
reduceByKey(func, [numTasks]): 就是用一个给定的reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数
sortByKey([ascending], [numTasks]) 按照key来进行排序,是升序还是降序,ascending是boolean类型
join(otherDataset, [numTasks]) 当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数
cartesian(otherDataset) 笛卡尔积就是m*n
intersection(otherDataset): 交集
substract(otherDataset): 差集
sortBy(keyfunc, ascending=True, numPartitions=None): Sorts this RDD by the given keyfunc
**常用的action操作:**
reduce(func): 对RDD中的items做聚合
collect(): 计算所有的items并返回所有的结果到driver端,接着 collect()会以Python list的形式返回结果
count(): 返回的是dataset中的element的个数
first(): 和上面是类似的,不过只返回第1个item
take(n): 类似,但是返回n个item
top(n): 返回头n个items,按照自然结果排序
countByKey(): 返回的是key对应的个数的一个map,作用于一个RDD
foreach(): 对dataset中的每个元素都使用func
takeSample(): 指定采样个数,返回相应的数目
saveAsTextFile(path): 把dataset写到一个text file中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中
特别注意:Spark的核心概念是惰性计算,当你把一个RDD转换成另一个时,这些转换不会立即生效,action就是唤醒的时候
2.初始化RDD
1)初始方法1
如果本地已经有一份序列化的数据如python的list,你可以通过sc.parallelize去初始化一个RDD,当执行完这个操作之后,list中的元素会被自动分块,并把每一块送到集群上的不同机器上。
import pyspark
from pyspark import SparkConf,SparkContext
conf = SparkConf.setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
my_list = [1,2,3,4,5]
rdd = sc.parallelize(my_list)
rdd
#获取分片数
rdd.getNumPartitions() #结果为本地笔记本cpu的核数
#查看分区情况。数据量太大的时候千万不能用collect,他会把rdd上的所有的数据取回本地,然后以列表的形式展示
rdd.glom().collect() #结果【[1],[2],[3],[4,5]】
使用sc.parallelize,你可以把Python list,Numpy array 或者Pandas Series,DataFrame 转换为Spark RDD
2)初始方法2
直接把文件读到RDD,此时文件中的每一行都会被当作一个item,不过需要注意的是saprk默认读取的文件路径是HDFS的,所以如果读取本地文件时,要加入**file://**开头的全局路径
import pyspark
from pyspark import SparkConf,SparkContext
conf = SparkConf.setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
#显示文件的前5行
!head -5 ./name/yob1880.txt
#获取当前路径
import os
cwd = os.getcwd()
#读取本地文件
rdd = sc.textFile("file://"+cwd+"/name/yob1880.txt")
rdd.first()
也可以直接读取整个文件夹的所有文件,但这种读法,RDD中的每个item实际上是一个形如(文件名,文件所有内容)的元组,如下例
#读取整个文件家中的所有内容
rdd = sc.wholeTexeFiles("file://"+cwd+"/name")
3)其他的初始化方式
- HDFS上的文件
- Hive中的数据库与表
- Spark Sql 的到的结果
上一篇: 运用jQuery写的验证表单(实例讲解)
下一篇: ai怎么设计女士高跟凉鞋图标?
推荐阅读