Spark基本操作 RDD map filter aggregate
程序员文章站
2022-05-18 19:59:58
...
文件读取,转化成RDD,输出
from pyspark.sql import SparkSession
"""
Spark RDD
每个spark都由一个驱动器程序(driver program)来发起集群上的各种并行操作
SparkContext对象代表对计算集群的一个连接,简写为sc
"""
spark = SparkSession.builder.master('local').appName("test_script").getOrCreate()
# 读取文件
df = sc.textFile("test.txt")
# 过滤操作
df2 = df.filter(lambda x: "export" in x)
df3 = df.filter(lambda x: "alias" in x)
# 输出
print(df2.collect())
print(df3.collect())
RDD常见操作
- 操作可分为转化(transformation)和行动(action)
- 转化: 一个RDD生成一个新的RDD
- 行动: 对RDD计算出一个结果
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName("test_script").getOrCreate()
sc = spark.sparkContext
# 1 转化
# map 映射操作
# nums = sc.parallelize([1,2,3,4])
# squared = nums.map(lambda x: x*x).collect()
# for num in squared:
# print("%i " %num)
# 输出:
# 1
# 4
# 9
# 16
# flatmap 输出一个返回值序列的迭代器
# lines = sc.parallelize(["hello sdjfl sdjfkldsj sfjl", "hi"])
# words = lines.flatMap(lambda line: line.split(" "))
# print(words.collect())
# 输出
# ['hello', 'sdjfl', 'sdjfkldsj', 'sfjl', 'hi']
# 2 行动
# reduce 接收的参数个数只能为2,先把sequence中第一个值和第二个值当参数传给function,再把function的返回值和第三个值当参数传给function,然后只返回一个结果。
# rdd = sc.parallelize([1,2,3,4])
# sum = rdd.reduce(lambda x,y: x+y)
# print(sum)
# 输出
# 10
# aggregate
# seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue.
rdd = sc.parallelize([1,2,3,4])
seqOp= (lambda x, y: (x[0]+ y, x[1]+1))
combOp = (lambda x, y: (x[0]+ y[0], x[1]+ y[1]))
res = sc.parallelize([1,2,3,4]).aggregate((0,0), seqOp, combOp)
print(res)
rdd.saveAsTextFile("tmp.txt")
# 输出
# (10, 4)
# 计算过程
# 1. 0+1, 0+1
# 2. 1+2, 1+1
# 3. 3+3, 2+1
# 4. 6+4, 3+1
上一篇: PHP callback函数使用方法和注意事项_php技巧
下一篇: 双字节字符截断