在python中使用pyspark读写Hive数据操作
程序员文章站
2022-06-15 17:40:46
1、读hive表数据pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用sql语句...
1、读hive表数据
pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用sql语句从hive里面查询需要的数据,代码如下:
from pyspark.sql import hivecontext,sparksession _spark_host = "spark://spark-master:7077" _app_name = "test" spark_session = sparksession.builder.master(_spark_host).appname(_app_name).getorcreate() hive_context= hivecontext(spark_session ) # 生成查询的sql语句,这个跟hive的查询语句一样,所以也可以加where等条件语句 hive_database = "database1" hive_table = "test" hive_read = "select * from {}.{}".format(hive_database, hive_table) # 通过sql语句在hive中查询的数据直接是dataframe的形式 read_df = hive_context.sql(hive_read)
2 、将数据写入hive表
pyspark写hive表有两种方式:
(1)通过sql语句生成表
from pyspark.sql import sparksession, hivecontext _spark_host = "spark://spark-master:7077" _app_name = "test" spark = sparksession.builder.master(_spark_host).appname(_app_name).getorcreate() data = [ (1,"3","145"), (1,"4","146"), (1,"5","25"), (1,"6","26"), (2,"32","32"), (2,"8","134"), (2,"8","134"), (2,"9","137") ] df = spark.createdataframe(data, ['id', "test_id", 'camera_id']) # method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字 df.registertemptable('test_hive') sqlcontext.sql("create table default.write_test select * from test_hive")
(2)saveastable的方式
# method two # "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表 # mode("append")是在原有表的基础上进行添加数据 df.write.format("hive").mode("overwrite").saveastable('default.write_test')
tips:
spark用上面几种方式读写hive时,需要在提交任务时加上相应的配置,不然会报错:
spark-submit --conf spark.sql.catalogimplementation=hive test.py
补充知识:pyspark基于shc框架读取hbase数据并转成dataframe
一、首先需要将hbase目录lib下的jar包以及shc的jar包复制到所有节点的spark目录lib下
二、修改spark-defaults.conf 在spark.driver.extraclasspath和spark.executor.extraclasspath把上述jar包所在路径加进去
三、重启集群
四、代码
#/usr/bin/python #-*- coding:utf-8 –*- from pyspark import sparkcontext from pyspark.sql import sqlcontext,hivecontext,sparksession from pyspark.sql.types import row,stringtype,structfield,stringtype,integertype from pyspark.sql.dataframe import dataframe sc = sparkcontext(appname="pyspark_hbase") sql_sc = sqlcontext(sc) dep = "org.apache.spark.sql.execution.datasources.hbase" #定义schema catalog = """{ "table":{"namespace":"default", "name":"teacher"}, "rowkey":"key", "columns":{ "id":{"cf":"rowkey", "col":"key", "type":"string"}, "name":{"cf":"teacherinfo", "col":"name", "type":"string"}, "age":{"cf":"teacherinfo", "col":"age", "type":"string"}, "gender":{"cf":"teacherinfo", "col":"gender","type":"string"}, "cat":{"cf":"teacherinfo", "col":"cat","type":"string"}, "tag":{"cf":"teacherinfo", "col":"tag", "type":"string"}, "level":{"cf":"teacherinfo", "col":"level","type":"string"} } }""" df = sql_sc.read.options(catalog = catalog).format(dep).load() print ('***************************************************************') print ('***************************************************************') print ('***************************************************************') df.show() print ('***************************************************************') print ('***************************************************************') print ('***************************************************************') sc.stop()
五、解释
数据来源参考请本人之前的文章,在此不做赘述
schema定义参考如图:
六、结果
以上这篇在python中使用pyspark读写hive数据操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
推荐阅读