欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

list, dict, tuple,rowproxy 转dataframe,pandas的df与spark的df互转

程序员文章站 2024-03-23 18:20:46
...
 #res=DbService.getTrainData("F20000",3)#sqlalchemy查询结果--》rowproxy
 #res=DbService.getTestMysqlDb("F20000",3)#MySQLdb查询结果--》tuple
 #res=MetaDataDao.findByDrgId2("F20000")#PymysqlPool封装查询结果---》dict
 #row2Dataframe(res)
 #convert2Dataframe(res)
 #dict2Dataframe(res)

#tuple转dataframe====>mysqlDb查询结果
def convert2Dataframe(data):
    df=pd.DataFrame(list(data),columns=["akc190","yka055","yp_sum","ypzf_yp","hl_sum"])
    ss=df[["yka055","akc190"]]#dataframe数据选取
    print(df)
#dict转dataframe
def dict2Dataframe(data):
    df=pd.DataFrame(data)
    df2=pd.DataFrame.from_dict(data)
    print(df)
#rowproxy转dataframe
def row2Dataframe(data):
    arrs=[]
    for d in data:
        flag1=type(d)
        # todo 转tuple
        res=d._row
        arrs.append(res)
    #todo list[tuple] 转dataframe
    convert2Dataframe(arrs)

#第二种:rowproxy转dataframe
def row2Dataframe(data):
    df=pd.DataFrame(data,columns=["akc190","yka055","yp_sum","ypzf_yp","hl_sum"])
    ss=df[["yka055","akc190"]]#dataframe数据选取

pandas的df与spark的df进行互转

import pandas as pd
from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.types as typ

'''pandas dataframe转spark df'''

##直接调用官方api进行转换
def createSpark():
    spark=SparkSession.builder.getOrCreate()
    df=pd.read_excel('C:\\Users\\shea\\Desktop\\测试.xlsx')
    print(df)
    #使用sqlContext转换
    sqlContext=SQLContext(spark.sparkContext)
    spark_df=sqlContext.createDataFrame(df)
    spark_df.select("bah").show()
#使用shema转换--针对某些列 有特殊值,不能模板匹配类型
def convert2Spakr():
    columns=[
        ('bah',typ.StringType()),
        ('name', typ.StringType()),
        ('sex', typ.IntegerType()),
        ('akc194', typ.StringType()),
        ('zzfs', typ.StringType())
    ]
    schema=typ.StructType([
        typ.StructField(index[0],index[1],False) for index in columns
    ])

    spark = SparkSession.builder.getOrCreate()
    df = pd.read_excel('C:\\Users\\shea\\Desktop\\测试.xlsx')

    df['akc194']=df['akc194'].map(lambda x:str(x))#操作某一列,将timestamp转为为string

    #dataframe转list
    ss=df.values.tolist()
    ss_tuple=list(map(lambda x:tuple(x),ss))
    spark_df=spark.createDataFrame(ss_tuple,schema=schema)
    spark_df.show()


if __name__ == '__main__':
    #createSpark()
    convert2Spakr()

 

转载于:https://my.oschina.net/shea1992/blog/3029197