欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

CDH的 hive on spark(spark on yarn)

程序员文章站 2022-04-29 10:54:08
...

 

总帖:CDH 6系列(CDH 6.0、CHD6.1等)安装和使用

 

CDH的 hive on spark(spark on yarn)

CDH的 hive on spark(spark on yarn)

CDH的 hive on spark(spark on yarn)

1.所有版本:https://www.scala-lang.org/download/all.html
  2.11.8版本:https://www.scala-lang.org/download/2.11.8.html 
  2.12.8版本:
    tar -zxvf scala-2.12.8.tgz
    mv scala-2.12.8 scala
    scp -r /root/scala aaa@qq.com:/root
    scp -r /root/scala aaa@qq.com:/root
2.配置环境变量,将scala加入到PATH中:
        vim /etc/profile
        主要添加蓝色字体处 
            export PATH=$PATH:$JAVA_HOME/bin:/root/scala/bin
            或者
            PATH=$JAVA_HOME/bin:$PATH:/root/scala/bin
            export JAVA_HOME CLASSPATH PATH
        source /etc/profile
3.scala的命令行模式:
    输入 scala ,执行1+1,输出结果2

================================CDH 安装 spark===============================================

CDH的 hive on spark(spark on yarn)

CDH的 hive on spark(spark on yarn)

CDH的 hive on spark(spark on yarn)

CDH的 hive on spark(spark on yarn)

------------------------------------------------------------------------------------------------------------

1.spark的命令行模式
    1.第一种进入方式:执行 pyspark进入,执行exit()退出
        1.注意报错信息:java.lang.IllegalArgumentException: 
            Required executor memory (1024+384 MB) is above the (最大阈值)max threshold (1024 MB) of this cluster! 
            表示 执行器的内存(1024+384 MB) 大于 最大阈值(1024 MB)
            Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'

CDH的 hive on spark(spark on yarn)

CDH的 hive on spark(spark on yarn)
 

        2.初始化RDD的方法
            本地内存中已经有一份序列数据(比如python的list),可以通过sc.parallelize去初始化一个RDD。
            当执行这个操作以后,list中的元素将被自动分块(partitioned),并且把每一块送到集群上的不同机器上。
            import pyspark
            from pyspark import SparkContext as sc
            from pyspark import SparkConf
            conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
            #任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数(比如主节点的URL)。
            #初始化后,就可以使用SparkContext对象所包含的各种方法来创建和操作RDD和共享变量。
            #Spark shell会自动初始化一个SparkContext(在Scala和Python下可以,但不支持Java)。
            #getOrCreate表明可以视情况新建session或利用已有的session
            sc=SparkContext.getOrCreate(conf) 
 
            # 利用list创建一个RDD;使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame转成Spark RDD。
            rdd = sc.parallelize([1,2,3,4,5])
            rdd  打印 ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

            # getNumPartitions() 方法查看list被分成了几部分
            rdd.getNumPartitions()  打印结果:2
             # glom().collect()查看分区状况
            rdd.glom().collect() 打印结果: [[1, 2], [3, 4, 5]] 
            
    2.第二种进入方式:可直接执行 spark-shell,也可以执行 spark-shell --master local[2]
        多线程方式:运行 spark-shell --master local[N] 读取 linux本地文件数据 
        通过本地 N 个线程跑任务,只运行一个 SparkSubmit 进程
            利用 spark-shell --master local[N] 读取本地数据文件实现单词计数
          master local[N]:采用本地单机版的来进行任务的计算,N是一个正整数,它表示本地采用N个线程来进行任务的计算,会生成一个SparkSubmit进程
    3.需求:
        读取本地文件,实现文件内的单词计数。
        本地文件 /root/scala/words.txt 内容如下:
            hello me
            hello you
            hello her
    4.编写 scala 代码:此处应使用spark-shell --master local[2]进行操作,如果使用spark-shell会报错
        sc.textFile("file:///root///scala///words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
        输出 res0: Array[(String, Int)] = Array((hello,3), (me,1), (you,1), (her,1))
===========================================================================

在hive中执行 select * from 表名;即能运行 hive on spark引擎进行计算,yarn的web UI页面中,点击对应运行的spark程序查看运行信息和报错信息

如果再执行 hive on spark任务时出现以下错误信息(要看该yarn任务程序对应的日志信息):
    ERROR client.RemoteDriver: Failed to start SparkContext: java.lang.IllegalArgumentException: Executor memory 456340275 must be at least 471859200. 
    Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration
解决:在 Hive中 搜索 spark.executor.memory 进行配置到可使用的范围大小


CDH的 hive on spark(spark on yarn)

 

1.运行hive on spark的sql语句进行计算时,报错信息可在yarn的web UI页面中,点击对应运行的spark程序查看运行信息和报错信息

CDH的 hive on spark(spark on yarn)

    报错信息:Caused by: java.lang.IllegalArgumentException: 
         Executor memory 456340275 must be at least 471859200. 
         Please increase executor memory using the --executor-memory option or spark.executor.memory in Spark configuration
    问题:表示 spark.executor.memory(Spark 执行程序最大 Java 堆栈大小)的值过小
    解决:那么把 spark.executor.memory 设置大于 报错信息中规定的 at least 471859200


2.如果spark程序没有成功运行结束而导致永远卡在运行任务中而不结束时,可以使用 yarn application -kill 命令 加上 程序ID 进行结束某程序     

CDH的 hive on spark(spark on yarn)

     杀死程序的命令:yarn application -kill 程序的ID
    因为/usr/bin/yarn 已经存在,所以不需要执行下面的创建软连接的操作
        cd /opt/cloudera/parcels/CDH-6.0.0-1.cdh6.0.0.p0.537114/lib/hadoop-yarn/bin
        ln -s /opt/cloudera/parcels/CDH-6.0.0-1.cdh6.0.0.p0.537114/lib/hadoop-yarn/bin/yarn /usr/bin/yarn 

CDH的 hive on spark(spark on yarn)

3.在使用 yarn HA时,运行 hive on yarn 的任务无法得出结果时,并且出现以下错误 
    Caused by:javax.servlet.ServletException: Could not determine the proxy server for redirection
    问题:无法确定用于重定向的代理服务器
    解决:禁用 YARN HA,即ResourceManager只使用一个主节点,其实一般yarn HA仍然能运行 hive on yarn 的任务并且能得出正常结果,但是还是会报出同样错误


CDH的 hive on spark(spark on yarn)

CDH的 hive on spark(spark on yarn)


 

4.当前运行的环境是 YARN HA(node1、node2均部署了ResourceManager)的情况下,执行 hive on spark 的程序,虽然能得出正常执行成功得出结果,
  但是对应该程序的日志信息仍然报错:无法确定用于重定向的代理服务器

Could not determine the proxy server for redirection。
    select * from test_tb;
    select count(*) from test_tb;
    insert into test_tb values(2,'ushionagisa');


CDH的 hive on spark(spark on yarn)

---------------------------------------------------------------------------------------------------------------------------

spark-sql命令操作的数据库存储在hdfs文件系统中

CDH的 hive on spark(spark on yarn)

CDH的 hive on spark(spark on yarn)

脚本中定义任务提交的命令:
    Default Hive database:hdfs://nameservice1/user/hive/warehouse
    spark.master:spark://master:7077
    /root/spark/bin/spark-sql --master spark://node1:7077 --executor-memory 1g --total-executor-cores 2 --conf spark.sql.warehouse.dir=hdfs://nameservice1/user/hive/warehouse

===================== 优化配置信息================================ 

1.Hive默认使用的计算框架是MapReduce,在我们使用Hive的时候通过写SQL语句,Hive会自动将SQL语句转化成MapReduce作业去执行,
  但是MapReduce的执行速度远差与Spark。通过搭建一个Hive On Spark可以修改Hive底层的计算引擎,将MapReduce替换成Spark,从而大幅度提升计算速度。
  接下来就如何搭建Hive On Spark展开描述。
 
2.配置Yarn
    1.Yarn需要配置两个参数:
        1.yarn.nodemanager.resource.cpu-vcores:可以为container分配的CPU 内核的数量
            为每个服务分配一个core,为操作系统预留2个core,剩余的可用的core分配给yarn。
            我使用的伪集群(3个node,每个node8个核core)一共有24个core,留出3个给其他任务使用,剩余的21个核core分配给yarn,每个节点提供7个核core。

CDH的 hive on spark(spark on yarn)

        2.yarn.nodemanager.resource.memory-mb:可分配给容器的物理内存大小
             设置Yarn内存一共为3G,每个节点提供1G,根据自己的电脑性能分配多少,应大于1G

CDH的 hive on spark(spark on yarn)

        3.yarn.scheduler.maximum-allocation-mb:scheduler调度程序所能申请的最大内存,根据自己的电脑性能分配多少,应大于1G

CDH的 hive on spark(spark on yarn)

3.配置Spark
         参数项            默认值    参数解释
        spark.executor.instances    无    一个Application拥有的Executor数量。取决于spark.executor.memory + spark.yarn.executor.memoryOverhead
        spark.executor.cores    1    单个Executor可用核心数
        spark.executor.memory    512m    单个Executor最大内存。
                        计算大小的公式 yarn.nodemanager.resource.memory-mb *(spark.executor.cores / yarn.nodemanager.resource.cpu-vcores)

        spark.executor.memory          每个执行程序进程使用的内存量 
        spark.executor.cores         每个执行程序的核心数 
        spark.yarn.executor.memoryOverhead  在Yarn上运行Spark时,每个执行程序要分配的堆外内存量(以兆字节为单位)。
                        这是内存,可以解决诸如VM开销,插入字符串,其他本机开销等问题。
                        除了执行程序的内存之外,启动执行程序的容器还需要一些额外的内存用于系统进程。
                        计算大小的公式:spark.executor.memory的15-20%
        spark.executor.instances         分配给每个应用程序的执行程序数 
        spark.driver.memory         分配给远程Spark上下文(RSC)的内存量。我们建议4GB 
        spark.yarn.driver.memoryOverhead     我们建议400(MB) 
 
    1.spark.executor.cores 单个Executor可用核心数
        1.在某些情况下,HDFS客户端没有并行处理多个写请求,在有多个请求竞争资源的时候会出现一个执行程序executor使用过多的core。
          尽可能的减少空闲的core的个数,cloudera推荐设置spark.executor.cores为4、5、6,这取决于给yarn分配的资源。
            比如说,因为我们把21个核core分配给yarn,所以有21个核core可用,那么我们可以设置为3,这样21/3余数为0,设置为4的话会剩余1个空闲。
          设置3个可使得空闲的core尽可能的少。这样配置之后我们可以最多同时运行7个执行程序executor,每个执行程序executor最多可以运行3个任务(每个核core为1个任务)。

        2.在YARN模式下,工作站上的所有可用内核都是独立模式和Mesos粗粒度模式。每个执行程序使用的核心数。 
        3.Executors Scheduling 执行程序调度
            分配给每个执行程序的核心数是可配置的。当spark.executor.cores显式设置时,如果worker具有足够的内核和内存,
            则可以在同一工作程序上启动来自同一应用程序的多个执行程序executor。否则,每个执行程序默认获取worker上可用的所有核心,
            在这种情况下,每个应用程序 在一次调度迭代期间 只能启动一个执行器executor 。
        4.Executor和分区
            Executor是一个独立的JVM进程,每个任务会有独立的线程来执行,Executor最大可并发任务数量与其拥有的核心数量相同,
            执行过程中的数据缓存放在Executor的全局空间中。
            根据以上我们可以得出:
                同一个Executor中执行的任务,可以共享同一个数据缓存。这也是Spark称之为Process local级别的数据本地性。
                Executor可并发执行的任务数量,与其所拥有的核心数相同。
                并发任务之间可能会产生相互干扰,如有些任务占用内存较大会导致其他并发任务失败。
                Executor都需要注册到Driver上并与其通信,过多的Executor数量会增加Driver负担。
                在阶段划分为任务时,会得到与分区数相同的任务数量。减少分区的数量将减少任务数,同时每个任务所处理的计算量会增大。
                考虑到任务本身的序列化,发送,运行环境准备,结果收集都需要占用Driver资源和Executor资源,减少任务数能够减少此类开销。
            在实践中,每个Executor可以配置多个核心,从而降低Executor数量,还可以得到更好的数据本地性。
            根据所配置的核心数量与分区数据量,可以估计出Executor所需最小内存 = 并发任务数 单分区大小 + 内存缓存分区数 单分区大小。
            分区数的配置与具体业务逻辑相关,为了将计算资源充分利用,可以参考:分区数 并发Job数 >= Executor数 Executor核心数。
            其中并发Job数是RDD在调用动作(action)类型的操作时产生的Job,Job之间的阶段是没有依赖关系的因此可并发执行。


    2.spark.executor.memory 单个Executor最大内存
          在配置executor的内存大小的时候,需要考虑以下因素:
            1.增加executor的内存可以优化map join。但是会增加GC的时间。
            2.还有一点是要求 spark.executor.memory 不能超过 yarn.scheduler.maximum-allocation-mb(scheduler调度程序所能申请的最大内存) 设置的值。

     3.配置Driver内存
        JVM申请的memory不够会导致无法启动SparkContext
        1.spark.driver.memory 当hive运行在spark上时,driver端可用的最大Java堆内存。
        2.spark.yarn.driver.memoryOverhead 每个driver可以额外从yarn请求的堆内存大小。
            spark.yarn.driver.memoryOverhead 加上 spark.driver.memory 就是yarn为driver端的JVM分配的总内存。
              Spark在Driver端的内存不会直接影响性能,但是在没有足够内存的情况下在driver端强制运行Spark任务需要调整。
        3.SparkContext的重用
            1.有些场景需要一个SparkContext持续接收计算任务,这种场景往往对计算任务的时效性要求较高(秒级别),
              并且可能会有并发的计算任务(如多用户提交任务)。这种场景适合采用yarn-client模式,让Driver位于应用内部,
              应用可以不断向Driver提交计算任务,并处理返回结果。这种模式的潜在风险在于Driver和Executor都会长时间持续运行,可能会有内存泄露的问题。
            2.在实践中,在RDD被persist缓存到内存后,调用unpersist并不能立即释放内存,而是会等待垃圾回收器对其进行回收。
              在垃圾回收器的选择上,建议使用CMS类型的垃圾回收器,用于避免垃圾回收过程中的顿卡现象。
            3.在Driver和Executor的垃圾回收不出问题的情况下,还是可以得到稳定的计算任务性能的。但如果某些情况下计算性能还是随时间推移而下降,
              则可以重启SparkContext以解决问题。因为重启SparkContext后Driver和Executor都会全新创建,因此能回到最初的性能。
              重启的方法是在当前所有任务都完成后,在应用中调用SparkContext.stop()方法,并移除SparkContext引用,然后创建新的SparkContext。
            4.Driver在启动时需要将Spark的Jar包上传到集群,用于启动每个Executor。这个jar包的大小约130M。
              Executor在接收任务时,会将任务所依赖的文件、Jar包传输到本地,这里的jar包是应用包,一般包含了应用的各类依赖一般也得100M,
              Jar包分发的耗时在10秒左右。在对计算任务时效性要求较高的场景,Jar包分发的10秒将是无法接受的。
              在这里可以采用预先分发的方式解决此问题。我们首先将Spark Jar和应用Jar上传到各个节点的某个相同位置,例如/root/sparkjar。
            5.避免Driver启动时分发Jar包:
                将Driver机上的SPARK_JAR环境变量设置为空,避免Jar包上传动作。
                在yarn-site.xml配置文件中,设置yarn.application.classpath为spark jar的位置与此项默认值。
            6.避免Task启动时分发依赖和Jar包:
                将spark.files和spark.jars中的路径配置为local:/root/sparkjar的模式,从而让Executor从本地复制。

    4.设置executor个数
          1.集群的executor个数设置由集群中每个节点的executor个数和集群的worker个数决定,
          如果集群中有3个worker,每个worker有8个核心,则Hive On Spark可以使用的executor最大个数是24个(3 * 8)。
          Hive的性能受可用的executor的个数影响很明显,一般情况下,性能和executor的个数成正比,4个executor的性能大约是2个executor性能的一倍,
          但是性能在executor设置为一定数量的时候会达到极值,达到这个极值之后再增加executor的个数不会增加性能,反而有可能会为集群增加负担。

        2.动态分配executor:
            spark.executor.instances 一个Application拥有的Executor数量,默认值为无
                  设置spark.executor.instances到最大值可以使得Spark集群发挥最大性能。但是这样有个问题是当集群有多个用户运行Hive查询时会有问题,
                应避免为每个用户的会话分配固定数量的executor,因为executor分配后不能回其他用户的查询使用,
                如果有空闲的executor,在生产环境中,计划分配好executor可以更充分的利用Spark集群资源。
                Spark允许动态的给Spark作业分配集群资源,cloudera推荐开启动态分配。

        3.设置并行度
              为了更加充分的利用executor,必须同时允许足够多的并行任务。在大多数情况下,hive会自动决定并行度,但是有时候我们可能会手动的调整并行度。
            在输入端,map task的个数等于输入端按照一定格式切分的生成的数目,Hive On Spark的输入格式是CombineHiveInputFormat,
            可以根据需要切分底层输入格式。调整hive.exec.reducers.bytes.per.reducer控制每个reducer处理多少数据。
            但是实际情况下,Spark相比于MapReduce,对于指定的hive.exec.reducers.bytes.per.reducer不敏感。
            我们需要足够的任务让可用的executor保持工作不空闲,当Hive能够生成足够多的任务,尽可能的利用空闲的executor。

4.配置Hive
    1.Hive on Spark的配置大部分即使不使用Hive,也可以对这些参数调优。
      但是hive.auto.convert.join.noconditionaltask.size这个参数是将普通的join转化成map join的阈值,这个参数调优对于性能有很大影响。
      MapReduce和Spark都可以通过这个参数进行调优,但是这个参数在Hive On MR上的含义不同于Hive On Spark。
    2.数据的大小由两个统计量标识:
        totalSize 磁盘上数据的大小
        rawDataSize 内存中数据的大小
    3.Hive On MapReduce使用的是totalSize,Spark使用rawDataSize。
        数据由于经过一系列压缩、序列化等操作,即使是相同的数据集,也会有很大的不同,对于Hive On Spark,
        需要设置 hive.auto.convert.join.noconditionaltask.size,将普通的join操作转化成map join来提升性能,
        集群资源充足的情况下可以把这个参数的值适当调大,来更多的触发map join。
        但是设置太高的话,小表的数据会占用过多的内存导致整个任务因为内存耗尽而失败,所有这个参数需要根据集群的资源来进行调整。
      4.Cloudera推荐配置两个额外的配置项:
        hive.stats.fetch.column.stats=true
        hive.optimize.index.filter=true

    5.以下还整理了一些配置项用于hive调优:
        hive.merge.mapfiles=true
        hive.merge.mapredfiles=false
        hive.merge.smallfiles.avgsize=16000000
        hive.merge.size.per.task=256000000
        hive.merge.sparkfiles=true
        hive.auto.convert.join=true
        hive.auto.convert.join.noconditionaltask=true
        hive.auto.convert.join.noconditionaltask.size=20M(might need to increase for Spark, 200M)
        hive.optimize.bucketmapjoin.sortedmerge=false
        hive.map.aggr.hash.percentmemory=0.5
        hive.map.aggr=true
        hive.optimize.sort.dynamic.partition=false
        hive.stats.autogather=true
        hive.stats.fetch.column.stats=true
        hive.compute.query.using.stats=true
        hive.limit.pushdown.memory.usage=0.4 (MR and Spark)
        hive.optimize.index.filter=true
        hive.exec.reducers.bytes.per.reducer=67108864
        hive.smbjoin.cache.rows=10000
        hive.fetch.task.conversion=more
        hive.fetch.task.conversion.threshold=1073741824
        hive.optimize.ppd=true

    7.官方的推荐配置 https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
        mapreduce.input.fileinputformat.split.maxsize=750000000
        hive.vectorized.execution.enabled=true

        hive.cbo.enable=true
        hive.optimize.reducededuplication.min.reducer=4
        hive.optimize.reducededuplication=true
        hive.orc.splits.include.file.footer=false
        hive.merge.mapfiles=true
        hive.merge.sparkfiles=false
        hive.merge.smallfiles.avgsize=16000000
        hive.merge.size.per.task=256000000
        hive.merge.orcfile.stripe.level=true
        hive.auto.convert.join=true
        hive.auto.convert.join.noconditionaltask=true
        hive.auto.convert.join.noconditionaltask.size=894435328
        hive.optimize.bucketmapjoin.sortedmerge=false
        hive.map.aggr.hash.percentmemory=0.5
        hive.map.aggr=true
        hive.optimize.sort.dynamic.partition=false
        hive.stats.autogather=true
        hive.stats.fetch.column.stats=true
        hive.vectorized.execution.reduce.enabled=false
        hive.vectorized.groupby.checkinterval=4096
        hive.vectorized.groupby.flush.percent=0.1
        hive.compute.query.using.stats=true
        hive.limit.pushdown.memory.usage=0.4
        hive.optimize.index.filter=true
        hive.exec.reducers.bytes.per.reducer=67108864
        hive.smbjoin.cache.rows=10000
        hive.exec.orc.default.stripe.size=67108864
        hive.fetch.task.conversion=more
        hive.fetch.task.conversion.threshold=1073741824
        hive.fetch.task.aggr=false
        mapreduce.input.fileinputformat.list-status.num-threads=5
        spark.kryo.referenceTracking=false
        spark.kryo.classesToRegister=org.apache.hadoop.hive.ql.io.HiveKey,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch

    6.设置Pre-warming Yarn Container
          我们使用Hive On Spark的时候,提交第一个查询时,看到查询结果可能会有比较长的延迟,但是再次运行相同的SQL查询,完成速度要比第一个查询快得多。
        当Spark使用yarn管理资源调度时,Spark executor需要额外的时间来启动和初始化,在程序运行之前,Spark不会等待所有的executor准备好之后运行,
        所有在任务提交到集群之后,仍有一些executor处于启动状态。在Spark上运行的作业运行速度与executor个数相关,
        当可用的executor的个数没有达到最大值的时候,作业达不到最大的并行性,所有Hive上提交的第一个SQL查询会慢。
        如果是在长时间会话这个应该问题影响很小,因为只有执行第一个SQL的时候会慢,问题不大,但是很多时候我们写的Hive脚本,
        需要用一些调度框架去启动(如Oozie)。这时候我们需要考虑进行优化。
        为了减少启动时间,我们可以开启container pre-warming机制,开启后只有当任务请求的所有executor准备就绪,作业才会开始运行。
        这样会提升Spark作业的并行度。