欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

PySpark DataFrame

程序员文章站 2022-06-03 18:58:40
...

在python中 RDD需要从Py4j 启动一个 JVM, 所有的RDD转换最开始都要映射到 JAVA中的pythonRDD对象,

所以 Python和JVM之间的频繁切换,通信会有很多的而外开销。

 

Dataframe 则提高了PySpark的查询性能


Table of Contents

创建DataFrame

JSON转DataFrame

Pandas与DataFrame的转换

DataFrame SQL查询

DataFrame的schema(模式)


创建DataFrame

Dataframe 需要首先建立SparkSession

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

 

JSON转DataFrame

这里df 是JSON格式的RDD

df = spark.read.json(your_JSON_file)

 

创建临时表

df.createOrReplaceTempView('df')

Spark 是惰性的,只有创建临时表之后,dataframe才会被执行

 

Pandas与DataFrame的转换

#Spark to pandas
df.toPandas()

#pandas to Spark
spark_dataframe = spark.createDataFrame(df)

 

但是在转换时要确定pandas的数据类型是不是spark所支持的,如果spark 推断不了数据类型, 需要手动编辑schema

 

DataFrame SQL查询

spark.sql('your sql query').collect()

这里collect()会返回所有的结果, take(n),和show(n)应该会有更好的性能

 

DataFrame的schema(模式)

DataFrame的schema 可以通过refection 推断出数据的schema,通过printSchema() 可以查看你的shcama

df.printSchema()

 有的时候,我们也可以手动设定schema 以保证数据有正确的格式

from pyspark.sql.types import *

schema = StructType(
[
StructField('id',LongType(),True,
StructField('name',StringType(),True
]
)

StructType将schema组成一个list, 然后StrucField()来定义每一列的格式, 里面包含三个参数

Creates a StructField
:param name: the name of this field. 名子
:param dataType: the data type of this field.  格式
:param nullable: indicates whether values of this field  是否可为空值
                 can be null.

然后利用我们编辑好的Schema来生成DataFrame

df = spark.createDataFrame(yourfile,schema)

 

相关标签: dataframe