欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spark 基础开发 Tips总结

程序员文章站 2022-09-14 17:14:33
本篇博客主要是 sparksql 从初始开发注意的一些基本点以及力所能及的可优化部分的介绍: 所使用spark版本:2.0.0 scala版本:2.11.8 1. SparkSession的初始化: 注意点: a. spark.sql.warehouse.dir 需要显示设置,否则会抛出 Excep ......

本篇博客主要是 sparksql 从初始开发注意的一些基本点以及力所能及的可优化部分的介绍:  

所使用spark版本:2.0.0       scala版本:2.11.8

1. sparksession的初始化:

 

val sparksession = sparksession.builder().master("local[*]").appname("appname").config("spark.sql.warehouse.dir", "file:///d:/xxxx/xxxx/spark-warehouse").config("spark.sql.shuffle.partitions", 50).getorcreate()

  

 注意点:

             a.  spark.sql.warehouse.dir 需要显示设置,否则会抛出 exception in thread "main" java.lang.illegalargumentexception: java.net.urisyntaxexception: relative path in absolute uri: file:...   错误

             b. spark.sql.shuffle.partitions  指定 shuffle 时 partition 个数,也即 reducer 个数。根据业务数据量测试调整最佳结果

                partition 个数不宜设置过大:

               reducer(代指 spark shuffle 过程中执行 shuffle read 的 task) 个数过多,每个 reducer 处理的数据量过小。大量小 task 造成不必要的 task 调度开销与可能的资源调度开销(如果开启了 dynamic allocation)

            reducer 个数过大,如果 reducer 直接写 hdfs 会生成大量小文件,从而造成大量 addblock rpc,name node 可能成为瓶颈,并影响其它使用 hdfs 的应用

            过多 reducer 写小文件,会造成后面读取这些小文件时产生大量 getblock rpc,对 name node 产生冲击

               partition 个数不宜设置过小:

            每个 reducer 处理的数据量太大,spill 到磁盘开销增大

            reducer gc 时间增长

            reducer 如果写 hdfs,每个 reducer 写入数据量较大,无法充分发挥并行处理优势

2. 将非结构化数据转换为结构化数据dataframe(本人用的自定义模式): 

    val rdd= sparksession.sparkcontext.textfile(path, 250)  // 默认split为2

    val schemastring = "time hour lic"   //结构化数据的列名,可理解为关系型数据库的列名

    val fields = schemastring.split(" ").map(fieldname => structfield(fieldname, stringtype, nullable = true))   // 字段名  字段类型  是否可为空

    val schema = structtype(fields)      //上两步组装最终 createdataframe 时需要的 schema

    val rowrdd = citysecrdd.map(_.split(",")).filter(attributes => attributes.length >= 6 && attributes(1).equals("2")&& attributes(0).split(" ").length > 1 && attributes(0).split(" ")(1).split(":").length > 1).map(attributes => {row(attributes(0).trim,attributes(0).split(" "                   (1).split(":")(0).trim,attributes(2).trim,attributes(3).trim,attributes(4).trim,attributes(5).trim)})         //自定义一些过滤条件  以及组装最终的 row类型的rdd

    val df= sparksession.createdataframe(rowrdd, schema)       //将rdd装换成dataframe

3. 两种缓存使用方式:

    1)df.persist(storagelevel.memory_only)     //后续如果需要反复使用df[dataframe的简称],则就把此df缓存起来                            
df.unpersist() //释放缓存 常用的两种序列化方式:memory_only->不加工在内存中存储 memory_only_ser->在内存中序列化存储(占用内存空间较小) 2)df.createorreplacetempview("table") sparksession.sql("cache table table") // 以 sql 形式缓存df
sparksession.sql("uncache table table") //释放缓存

4.spark整合hbase快速批量插入

  将计算结果写入hbase:

      注意:1) 如果是带有shuffle过程的,shuffle计算之前使用select()提出只需要的字段然后再进行计算,因为shuffle特别耗费时间,写磁盘的过程,所以要能少写就少写。

df.foreachpartition(partition => {

      val hconf = hbaseconfiguration.create();

      hconf.set(zkclientport, zkclientportvalue) //zk 端口

      hconf.set(zkquorum, zkquorumvalue) //zk 地址
      hconf.set(hbasemaster, hbasemastervalue) //hbase master
       val mytable = new htable(hconf, tablename.valueof(tablename))
       mytable.setautoflush(false, false) //关键点1
      mytable.setwritebuffersize(5 * 1024 * 1024) //关键点2
      partition.foreach(x => {

      val column1 = x.getas[string]("column1") //列1
      val column2 = x.getas[string]("column2") //列2
      val column3 = x.getas[double]("column3") //列3
      val date = datestr.replace("-", "") // 格式化后的日期

    val rowkey = md5hash.getmd5ashex(bytes.tobytes(column1+ date)) + bytes.tobytes(hour)
    val put = new put(bytes.tobytes(rowkey))
    put.add("c1".getbytes(), "column1".getbytes(), licplatenum.getbytes()) //第一列族 第一列 
    put.add("c1".getbytes(), "column2".getbytes(), hour.getbytes()) //第一列族 第二列
    put.add("c1".getbytes(), "column3".getbytes(), interval.tostring.getbytes()) //第一列族 第三列
    put.add("c1".getbytes(), "date".getbytes(), date.getbytes()) //第一列族 第四列
    mytable.put(put)
     })
     mytable.flushcommits() //关键点3
    /*
    *关键点1_:将自动提交关闭,如果不关闭,每写一条数据都会进行提交,是导入数据较慢的做主要因素。
     关键点2:设置缓存大小,当缓存大于设置值时,hbase会自动提交。此处可自己尝试大小,一般对大数据量,设置为5m即可,本文设置为3m。
     关键点3:每一个分片结束后都进行flushcommits(),如果不执行,当hbase最后缓存小于上面设定值时,不会进行提交,导致数据丢失。
     注:此外如果想提高spark写数据如hbase速度,可以增加spark可用核数量。
    */

5. spark任务提交shell脚本:

spark-submit --jars /xxx/xxx/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar \
         --master yarn\
         --num-executors 200 \
         --conf "spark.driver.extraclasspath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \
         --conf "spark.executor.extraclasspath=/share/apps/hbase/latest/lib/hbase-protocol-0.96.1.1-cdh5.0.2.jar" \ 
         --conf spark.driver.cores=2 \
         --conf spark.driver.memory=10g \
         --conf spark.driver.maxresultsize=2g \
         --conf spark.executor.cores=6 \
         --conf spark.executor.memory=10g \
         --conf spark.shuffle.blocktransferservice=nio \
         --conf spark.memory.fraction=0.8 \
         --conf spark.shuffle.memoryfraction=0.4 \               
         --conf spark.default.parallelism=1000 \
         --conf spark.sql.shuffle.partitions=400 \                     默认200,如果项目中代码设置了此选项,则代码设置级别优先,会覆盖此处设置
         --conf spark.shuffle.consolidatefiles=true \
         --conf spark.shuffle.io.maxretries=10 \
         --conf spark.scheduler.listenerbus.eventqueue.size=1000000 \
         --class xxxxx\                                                                项目启动主类引用
         --name zzzz \
         /data/xxx/xxx-jar-with-dependencies.jar \                       项目jar包
        "参数1" "参数2" 

  

注: 红色部分是hbase需要的配置,同时需要在spark集群的spark-defaults.conf 里面配置

        spark.driver.extraclasspath  和  spark.executor.extraclasspath   直指 hbase-protocol-0.96.1.1-cdh5.0.2.jar 路径

先写到这里吧,后续会继续完善通过sparkui 优化细节以及提交spark任务的时候 如何分配 executor.cores 和 executor.memory。