欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

pyspark RDD 的介绍和基本操作

程序员文章站 2022-05-18 20:26:03
...

pyspark RDD 的介绍和基本操作

RDD介绍

虽然现在pyspark已经支持了DataFrame 但是有的时候不得不用一下RDD
但是 官方文档很多地方说的不明不白 所以自己做了实验在这里总结一下。

RDD是用位置来做映射的 可以看做是一个大号的python list 区别在于他是被分布式存储 不是python中的list 是单机存储的 里面装的全是 单个元素或者元组(元素大于等于2的元组)。每个元素可以单个元素或者元组
如果是元组那么这个元组的第一个元素叫做key 元组的第二个元素叫做value 其余的被忽视了。。。

pyspark官方文档

创建RDD

sc 是sparkcontext
下面的方式只是为了做实验 使用parallelize 实际上rdd是从hdfs中读取创建的 可以读取parquet文件或者 textfile之类的 具体参考pyspark官方文档

>>> rdd1 = sc.parallelize(['a', 'b', 'a', 'c', 'c', 'a', 'c'])
ParallelCollectionRDD[1] at parallelize at PythonRDD.scala:489

从RDD 恢复到本地

就是将分布式版本的RDD 转化为单机版本的python list
collect操作会把存储在分布式系统上的数据拉到 单机上 所以要保证rdd1数据量足够小 不然会炸内存

>>> rdd1.collect()
['a', 'b', 'a', 'c', 'c', 'a', 'c']

repartition

将RDD按照指定的分区 重新分区 可以根据集群的情况来进行 直接关系到数据处理的性能
简单来说 就是如果集群中有三台机器 将数据分为三个分区 那么可以充分发挥每台机器的性能 分布式的对数据进行处理。
sc.parallelize中的第二个参数 就是在创建RDD 时指定分区

>>> partitionRdd = sc.parallelize(list(range(1000)), 4)
>>> partitionRdd.repartition(10).saveAsTextFile("s3o://ASH-PROFILE/zi/tmp/part1.txt")

可以看到分区后的数据在hdfs下被按照分区的数量存储 比如part1.txt 10个分区 在hdfs下可以看到part1.txt 被存储为10个part文件 如下

-rw-rw-rw-   1 sdev sdev          0 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/_SUCCESS
-rw-rw-rw-   1 sdev sdev        390 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00000
-rw-rw-rw-   1 sdev sdev        380 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00001
-rw-rw-rw-   1 sdev sdev        390 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00002
-rw-rw-rw-   1 sdev sdev        390 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00003
-rw-rw-rw-   1 sdev sdev        350 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00004
-rw-rw-rw-   1 sdev sdev        390 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00005
-rw-rw-rw-   1 sdev sdev        390 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00006
-rw-rw-rw-   1 sdev sdev        390 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00007
-rw-rw-rw-   1 sdev sdev        390 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00008
-rw-rw-rw-   1 sdev sdev        430 2019-12-20 06:52 s3o://ASH-PROFILE/zi/tmp/part1.txt/part-00009

map 逐个元素映射

将每个元素 映射为另一个元素 相当于分布式版本的 python map 函数
例子中将一个char 映射为 一个元组 元组的第二个元素是字母对应的ascii

>>> rdd2 = rdd1.map(lambda r: (r, ord(r)))
>>> rdd2.collect()
[('a', 97), ('b', 98), ('a', 97), ('c', 99), ('c', 99), ('a', 97), ('c', 99)]

reduce 元素聚合

reduce 函数和python的reduce功能一样 只不过是作用在分布式的数据集 rdd上
函数 需要注意的是 reduce函数传入的参数和传出的参数应该是同一种类型 比如传入的两个参数是整数那么返回的聚合结果也必须是整数 如果变成字符串了那么这个字符串之后还可能和其他的整数元素做聚合 因为没有定义 所以就会出异常。

>>> rdd1.reduce(lambda x, y: x + y)
'abaccac'

keys 获取keyRDD

类似于python字典中的keys()方法 也就是吧rdd中所有的key作为一个新的rdd返回

>>> rdd2.collect()
[('a', 97), ('b', 98), ('a', 97), ('c', 99), ('c', 99), ('a', 97), ('c', 99)]
>>> keys = rdd2.keys()
>>> keys.collect()
['a', 'b', 'a', 'c', 'c', 'a', 'c']

values 获取valueRDD

类似于python字典中的values()方法 也就是吧rdd中所有的value作为一个新的rdd返回

>>> rdd2.collect()
[('a', 97), ('b', 98), ('a', 97), ('c', 99), ('c', 99), ('a', 97), ('c', 99)]
>>> values = rdd2.values()
>>> values.collect()
[97, 98, 97, 99, 99, 97, 99]

注意 只有元组中的第二个value才被认为是value 第三个及以后的不是value 不被承认
比如下面的例子 可以发现使用values()返回的RDD 没有包含第三列 被忽略了

>>> rdd1 = sc.parallelize([(i, chr(ord('a') + i), "A") for i in range(10)], 4)
>>> rdd1.collect()
[(0, 'a', 'A'), (1, 'b', 'A'), (2, 'c', 'A'), (3, 'd', 'A'), (4, 'e', 'A'), (5, 'f', 'A'), (6, 'g', 'A'), (7, 'h', 'A'), (8, 'i', 'A'), (9, 'j', 'A')]
>>> rdd1.values().collect()
['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']

mapValues 对ValuesRDD进行映射

只对value做映射 并且在结果中保留原有的key

>>> rdd2.collect()
[('a', 97), ('b', 98), ('a', 97), ('c', 99), ('c', 99), ('a', 97), ('c', 99)]
>>> mapedValues = rdd2.mapValues(lambda r: r + 100)
>>> mapedValues.collect()
[('a', 197), ('b', 198), ('a', 197), ('c', 199), ('c', 199), ('a', 197), ('c', 199)]

groupByKey

按照RDD中每个元素的Key对数据进行分区。所有key相同的元素的value被放到一个叫做 pyspark.resultiterable.ResultIterable object的可迭代对象中
下面的例子是根据key对数据分组 并求一个key对应的所有value的和

>>> rdd2.collect()
[('a', 97), ('b', 98), ('a', 97), ('c', 99), ('c', 99), ('a', 97), ('c', 99)]
>>> groupedRdd = rdd2.groupByKey()
>>> groupedRdd.collect()
[('b', <pyspark.resultiterable.ResultIterable object at 0x7fea71782e10>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7fea71782e90>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7fea71785410>)]
>>> groupedRdd.mapValues(sum).collect()
[('b', 98), ('c', 297), ('a', 291)]

recuceByKey

先按照key对数据进行分组 然后对一个分组内的value分别对value进行聚合
聚合函数的要求 和reduce中是一样的 即一次聚合后的结果必须和输入的两个参数是一样的类型

>>> rdd2.collect()
[('a', 97), ('b', 98), ('a', 97), ('c', 99), ('c', 99), ('a', 97), ('c', 99)]
>>> reducedByKeyRDD = rdd2.reduceByKey(lambda x, y: x + y)
>>> reducedByKeyRDD.collect()
[('b', 98), ('c', 297), ('a', 291)]

flatMap

先对每个元素做map 但是这个元素做map后生成的是一个可迭代的对象 比如一个python的list
使用flat操作 将多个list 合并成一个大的list 有点类似于使用python list 中的extend方法
将多个pythonlist合并成一个大的 python list

>>> rdd2.collect()
[('a', 97), ('b', 98), ('a', 97), ('c', 99), ('c', 99), ('a', 97), ('c', 99)]
# 根据输入的key是第几个小写字母[a是第0个] 就将key 重复几次 
>>> rdd2.flatMap(lambda r: [r[0], ] * (ord(r[0]) - ord('a'))).collect()
['b', 'c', 'c', 'c', 'c', 'c', 'c']

mapPartition

mapPartitons 是以每个分区为单位进行映射 也就是我们在上面看到的分区中的每个part文件是一个基本的映射单位。

  1. 为什么要有这个操作
    因为有一些整块数据处理开销更小的场景。比如要分布式的把一些数据写入到一个mysql
    首先要创建连接 肯定不能直接使用map给每条数据都创建一个链接吧 费了半天劲捡了一个链接 就写一条数据??
    所以肯定是给整块的数据创建一个连接 然后用这个连接吧数据写到数据库.

  2. mapPartition接受的函数时什么样子的?
    需要被映射的每个mapPartition 被作为一个迭代器输入到映射函数中。mapPartition接受的函数的要求是 这个函数接受一个迭代器参数。
    最终这个函数返回的结果也应该是一个迭代器 哪怕我们把一个partition中的所有元素做了聚合 生成的结果是一个整数 也要把他当做迭代器中的一个元素来处理。可以考虑直接使用yeild 这些使用迭代器返回的元素会被全部放到最终输出的RDD中 有点类似于flatten 操作将多个list 通过extend 合并为一个大的list
    当然你也可以返回一个空的迭代器 用函数执行的副作用来完成你的功能 比如写入数据库之类的

#eg1: 将一个partition的数据打包成一个list 然后用yield返回
import random
>>> partitionRdd = sc.parallelize(list(range(10)), 4)
>>> def partition_func(iter_obj):
...     """
...      将一个partition的数据打包成一个list 然后用yield返回
...     """
...     partition_magic_number = random.randint(0, 1000) % 100
...     yield (partition_magic_number, list(iter_obj))
>>> partitionRdd.mapPartitions(partition_func).collect()
[(89, [0, 1]), (79, [2, 3]), (45, [4, 5]), (48, [6, 7, 8, 9])]

#eg2 将元素在partition内部map后打包 然后再合并为一个大的RDD
>>> partitionRdd = sc.parallelize(list(range(10)), 4)
>>> partitionRdd.mapPartitions(lambda iterobj: [100 + i for i in iterobj]).collect()
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

join

这里的join操作本身和sql中的类似 根据相同的key 把两部分数据组织起来
也就是根据两个RDD中的key 进行关联
rdd1 = [(k1, v1),...]
rdd2 = [(k1, v2),...]
关联后 变成
rdd_joined = [(k1, (v1, v2)),...]

下面是一个具体的例子

>>> rdd1 =  sc.parallelize([(i, chr(ord('a') + i)) for i in range(10)], 4)
>>> rdd1.collect()
[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e'), (5, 'f'), (6, 'g'), (7, 'h'), (8, 'i'), (9, 'j')]

>>> rdd2 =  sc.parallelize([(i, chr(ord('A') + i)) for i in range(0, 10, 2)], 4)
>>> rdd2.collect()
[(0, 'A'), (2, 'C'), (4, 'E'), (6, 'G'), (8, 'I')]

>>> joinedRdd = rdd1.join(rdd2)
>>> joinedRdd.collect()
[(8, ('i', 'I')), (0, ('a', 'A')), (2, ('c', 'C')), (4, ('e', 'E')), (6, ('g', 'G'))]

注意 join只会关联key 和value 元组中第三个位置的元素在关联时的将被忽略掉