欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【PySpark】Spark 2.0系列SparkSession与Spark 2.0之前版本中的SQLContext和HiveContext的联系与区别

程序员文章站 2022-06-12 19:49:49
...

目录

1.Spark在2.0版本和之前版本的入口

2.SQLContext的使用

3.HiveContext 的使用

4.SparkSession的三种创建方式

4.1SparkSession直接builder方式

4.2SparkConf的builder方式

4.3SparkContext方式

 

1.Spark在2.0版本和之前版本的入口

        在Spark2.0之前,sparkContext是进入Spark的切入点。众所周知的RDD的创建和操作就需要使用sparkContext提供的API。对于RDD之外的其他东西,我们需要使用其他的Context。

  • 流处理,使用StreamingContext
  • SQL:使用sqlContext
  • hive:使用HiveContext

spark 有三大引擎,spark core、sparkSQL、sparkStreaming:

  • spark core 的关键抽象是 SparkContext、RDD;
  • SparkSQL 的关键抽象是 SparkSession、DataFrame;
  • sparkStreaming 的关键抽象是 StreamingContext、DStream。

 

Spark 2.0引入了一个新的切入点,SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

2.SQLContext的使用

sparkSQL 的应用必须创建一个 SQLContext 或者 HiveContext 的类。这里我使用的是local方式来运行:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

conf = SparkConf().setAppName('test').setMaster('local')
sc = SparkContext(conf=conf)

sqlc = SQLContext(sc)
print(dir(sqlc))

# sqlcontext 读取数据也自动生成 df
data = sqlc.read.text('.../dict.txt')
print(type(data))

结果:

>print(dir(sqlc))
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_conf', '_inferSchema', '_instantiatedContext', '_jsc', '_jsqlContext', '_jvm', '_sc', '_ssql_ctx', 'cacheTable', 'clearCache', 'createDataFrame', 'dropTempTable', 'getConf', 'getOrCreate', 'newSession', 'range', 'read', 'readStream', 'registerDataFrameAsTable', 'registerFunction', 'registerJavaFunction', 'setConf', 'sparkSession', 'sql', 'streams', 'table', 'tableNames', 'tables', 'udf', 'uncacheTable']

>print(type(data))
<class 'pyspark.sql.dataframe.DataFrame'>

>data.show(5)
                                                                                +--------+
|   value|
+--------+
|    疫情|
|    不含|
|  不安排|
|  有暖气|
|公用wifi|
+--------+
only showing top 5 rows

3.HiveContext 的使用

HiveContext 是sparkSQL 的另一个入口点,继承了SQLContext,用于处理 hive 中的数据。HiveContext 对 SQLContext 进行了扩展,功能要强大的多

  • 它可以执行 HiveSQL 和 SQL 查询
  • 它可以操作 hive 数据,并且可以访问 HiveUDF
  • 它不一定需要 hive,在没有 hive 环境时也可以使用 HiveContext

这里举例是以集群方式调用:

  • 首先是配置文件(hello.sh文件):

 

#!/bin/bash

export PYSPARK_PYTHON=/usr/bin/python3.6 #python 的服务器路径


spark-submit \
    --master yarn-client --queue upd_security \
    --executor-memory 4G --num-executors 10 --executor-cores 3 \
    --driver-memory 16G \
    --conf "spark.pyspark.driver.python=/usr/bin/python3.6" \
    --conf "spark.pyspark.python=/usr/bin/python3.6" \
    --conf spark.rpc.message.maxSize=100 --conf spark.shuffle.manager=SORT \
    --conf spark.yarn.executor.memoryOverhead=2048 \
  • HiveContext.py文件:
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext

conf = SparkConf().setAppName('test').setMaster('yarn')
sc = SparkContext(conf=conf)

hivec = HiveContext(sc)
print(dir(hivec))

data = hivec.sql('''select internal_minute,starttime1,endtime1,cast(label as float) as label 
         from table
        where d in (select DATE_SUB(current_date(),1) as d)
          and internal_minute > 0
          limit 5''')
print(type(data))
print(data.show(5))
  • 然后在服务器上,使用下面命令调用:
bash hello.sh SparkSession.py

结果:

【PySpark】Spark 2.0系列SparkSession与Spark 2.0之前版本中的SQLContext和HiveContext的联系与区别

【PySpark】Spark 2.0系列SparkSession与Spark 2.0之前版本中的SQLContext和HiveContext的联系与区别

【PySpark】Spark 2.0系列SparkSession与Spark 2.0之前版本中的SQLContext和HiveContext的联系与区别

4.SparkSession的三种创建方式

4.1SparkSession直接builder方式

同样在集群上部署调度,使用yarn方式:

配置文件和调用命令和3一样,只需要改HiveContext.py为SparkSession.py即可。

SparkSession.py文件:这种方式是用的比较多的。

from pyspark.sql import SparkSession
# method 1
# sparkSQL 连接 hive 时需要enableHiveSupport()
# builder 方式必须getOrCreate
spark = SparkSession \
    .builder \
    .appName("m1") \
    .master("yarn") \
    .enableHiveSupport() \
    .getOrCreate()

trainData = spark.sql(
    '''select internal_minute,starttime1,endtime1,cast(label as float) as label 
         from table
        where d in (select DATE_SUB(current_date(),1) as d)
          and internal_minute > 0
          limit 5''')

print(trainData.show(5))

结果:

【PySpark】Spark 2.0系列SparkSession与Spark 2.0之前版本中的SQLContext和HiveContext的联系与区别

4.2SparkConf的builder方式

SparkSession.py文件:

# method 2
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf().setAppName('m2').setMaster('yarn')  # 设定 appname 和 master
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()  # builder 方式必须有这句

trainData = spark.sql(
    '''select internal_minute,starttime1,endtime1,cast(label as float) as label 
         from table
        where d in (select DATE_SUB(current_date(),1) as d)
          and internal_minute > 0
          limit 5''')


print(trainData.show(5))

结果:

【PySpark】Spark 2.0系列SparkSession与Spark 2.0之前版本中的SQLContext和HiveContext的联系与区别

4.3SparkContext方式

这种是从本地读取文件的方式,这里在本地运行:

from pyspark.sql import SparkSession

# method 3
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName('m3').setMaster('local')  # 设定 appname 和 master
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

data = spark.read.text('.../dict.txt')
print(data.show(5))

结果:

>print(data.show(5))
+--------+
|   value|
+--------+
|    疫情|
|    不含|
|  不安排|
|  有暖气|
|公用wifi|
+--------+
only showing top 5 rows
None