欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

在python中使用pyspark读写Hive数据操作

程序员文章站 2022-06-15 17:40:46
1、读hive表数据pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用sql语句...

1、读hive表数据

pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用sql语句从hive里面查询需要的数据,代码如下:

from pyspark.sql import hivecontext,sparksession
 
_spark_host = "spark://spark-master:7077"
_app_name = "test"
spark_session = sparksession.builder.master(_spark_host).appname(_app_name).getorcreate()
 
hive_context= hivecontext(spark_session )
 
# 生成查询的sql语句,这个跟hive的查询语句一样,所以也可以加where等条件语句
hive_database = "database1"
hive_table = "test"
hive_read = "select * from {}.{}".format(hive_database, hive_table)
 
# 通过sql语句在hive中查询的数据直接是dataframe的形式
read_df = hive_context.sql(hive_read)

2 、将数据写入hive表

pyspark写hive表有两种方式:

(1)通过sql语句生成表

from pyspark.sql import sparksession, hivecontext
 
_spark_host = "spark://spark-master:7077"
_app_name = "test"
 
spark = sparksession.builder.master(_spark_host).appname(_app_name).getorcreate()
 
data = [
 (1,"3","145"),
 (1,"4","146"),
 (1,"5","25"),
 (1,"6","26"),
 (2,"32","32"),
 (2,"8","134"),
 (2,"8","134"),
 (2,"9","137")
]
df = spark.createdataframe(data, ['id', "test_id", 'camera_id'])
 
# method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registertemptable('test_hive')
sqlcontext.sql("create table default.write_test select * from test_hive")

(2)saveastable的方式

# method two
 
# "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
# mode("append")是在原有表的基础上进行添加数据
df.write.format("hive").mode("overwrite").saveastable('default.write_test')
 

tips:

spark用上面几种方式读写hive时,需要在提交任务时加上相应的配置,不然会报错:

spark-submit --conf spark.sql.catalogimplementation=hive test.py

补充知识:pyspark基于shc框架读取hbase数据并转成dataframe

一、首先需要将hbase目录lib下的jar包以及shc的jar包复制到所有节点的spark目录lib下

二、修改spark-defaults.conf 在spark.driver.extraclasspath和spark.executor.extraclasspath把上述jar包所在路径加进去

三、重启集群

四、代码

#/usr/bin/python
#-*- coding:utf-8 –*-
 
from pyspark import sparkcontext
from pyspark.sql import sqlcontext,hivecontext,sparksession
from pyspark.sql.types import row,stringtype,structfield,stringtype,integertype
from pyspark.sql.dataframe import dataframe
 
sc = sparkcontext(appname="pyspark_hbase")
sql_sc = sqlcontext(sc)
 
dep = "org.apache.spark.sql.execution.datasources.hbase"
#定义schema
catalog = """{
       "table":{"namespace":"default", "name":"teacher"},
       "rowkey":"key",
       "columns":{
            "id":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"teacherinfo", "col":"name", "type":"string"},
            "age":{"cf":"teacherinfo", "col":"age", "type":"string"},
            "gender":{"cf":"teacherinfo", "col":"gender","type":"string"},
            "cat":{"cf":"teacherinfo", "col":"cat","type":"string"},
            "tag":{"cf":"teacherinfo", "col":"tag", "type":"string"},
            "level":{"cf":"teacherinfo", "col":"level","type":"string"} }
      }"""
 
df = sql_sc.read.options(catalog = catalog).format(dep).load()
 
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
df.show()
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
sc.stop()

五、解释

数据来源参考请本人之前的文章,在此不做赘述

schema定义参考如图:

在python中使用pyspark读写Hive数据操作

六、结果

在python中使用pyspark读写Hive数据操作

以上这篇在python中使用pyspark读写hive数据操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。