欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark基本操作 RDD map filter aggregate

程序员文章站 2022-05-18 19:59:58
...

文件读取,转化成RDD,输出

from pyspark.sql import SparkSession
"""
Spark RDD
每个spark都由一个驱动器程序(driver program)来发起集群上的各种并行操作
SparkContext对象代表对计算集群的一个连接,简写为sc
"""
spark = SparkSession.builder.master('local').appName("test_script").getOrCreate()
# 读取文件
df = sc.textFile("test.txt")
# 过滤操作
df2 = df.filter(lambda x: "export" in x)
df3 = df.filter(lambda x: "alias" in x)
# 输出
print(df2.collect())
print(df3.collect())

RDD常见操作

  • 操作可分为转化(transformation)和行动(action)
    • 转化: 一个RDD生成一个新的RDD
    • 行动: 对RDD计算出一个结果
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName("test_script").getOrCreate()
sc = spark.sparkContext

# 1 转化
# map 映射操作
# nums = sc.parallelize([1,2,3,4])
# squared = nums.map(lambda x: x*x).collect()
# for num in squared:
#     print("%i " %num)
# 输出:
# 1 
# 4 
# 9 
# 16 

# flatmap 输出一个返回值序列的迭代器
# lines = sc.parallelize(["hello sdjfl sdjfkldsj sfjl", "hi"])
# words = lines.flatMap(lambda line: line.split(" "))
# print(words.collect())
# 输出
# ['hello', 'sdjfl', 'sdjfkldsj', 'sfjl', 'hi']



# 2 行动
# reduce 接收的参数个数只能为2,先把sequence中第一个值和第二个值当参数传给function,再把function的返回值和第三个值当参数传给function,然后只返回一个结果。
# rdd = sc.parallelize([1,2,3,4])
# sum = rdd.reduce(lambda x,y: x+y)
# print(sum)
# 输出
# 10


# aggregate
# seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue.
rdd = sc.parallelize([1,2,3,4])
seqOp= (lambda x, y: (x[0]+ y, x[1]+1))
combOp = (lambda x, y: (x[0]+ y[0], x[1]+ y[1]))
res = sc.parallelize([1,2,3,4]).aggregate((0,0), seqOp, combOp)
print(res)
rdd.saveAsTextFile("tmp.txt")
# 输出
# (10, 4)
# 计算过程
# 1.  0+1,  0+1
# 2.  1+2,  1+1
# 3.  3+3,  2+1
# 4.  6+4,  3+1

参考1
参考2

相关标签: 大数据处理