欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark外部数据源部分总结

程序员文章站 2024-01-30 15:30:10
...

写Hbase

  //构建Hbase的配置对象
    val configuration = HBaseConfiguration.create()
    // 设置连接Zookeeper属性
    configuration.set("hbase.zookeeper.quorum", "node1")
    configuration.set("hbase.zookeeper.property.clientPort", "2181")
    configuration.set("zookeeper.znode.parent", "/hbase")
    // 设置将数据保存的HBase表的名称
    configuration.set(TableOutputFormat.OUTPUT_TABLE,"htb_wordcount")
==============================================================
    //将rsRDD转换为需要的KV类型对象
    val putRdd: RDD[(ImmutableBytesWritable, Put)] = rsRdd
        .map(tuple => {
          val word = tuple._1
          //基于单词构建rowkey对象
          val key = new ImmutableBytesWritable(Bytes.toBytes(word))
          //构建V的Put类型对象
          val value = new Put(Bytes.toBytes(word))
          //添加列
          value.addColumn(
            Bytes.toBytes("info"),
            Bytes.toBytes("cnt"),
            Bytes.toBytes(tuple._2.toString)
          )
          //返回KV
          (key,value)
        })
    //写入Hbase
    putRdd.saveAsNewAPIHadoopFile(
      "datas/output/hbase",
      classOf[ImmutableBytesWritable],
      classOf[Put],
      classOf[TableOutputFormat[ImmutableBytesWritable]],
      configuration)
 
===================================
// 设置读取哪张Hbase的表
    configuration.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
val hbaseRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
      configuration,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )
    //todo:2-处理数据
    //取出Value
    val dataRdd = hbaseRdd.map(tuple => tuple._2)
    //迭代取出每个rowkey
    dataRdd
      .take(3)
      .foreach(rs => {
      //将每个result中rowkey的所有列进行打印
      val cells: Array[Cell] = rs.rawCells()
      //取出每一列
      cells.foreach(cell => {
        //rowkey
        val rowkey = Bytes.toString(CellUtil.cloneRow(cell))
        val family = Bytes.toString(CellUtil.cloneFamily(cell))
        val column = Bytes.toString(CellUtil.cloneQualifier(cell))
        val value = Bytes.toString(CellUtil.cloneValue(cell))
        //println
        println(rowkey+"\t"+family+"\t"+column+"\t"+value)
        

写mysql

import java.sql.{Connection, DriverManager, PreparedStatement}

/**
  * @ClassName WordCount
  * @Description TODO 实现自定义开发Wordcount程序
  * */
object WordCountWriteToMySQL {

  /**
    * 用于写入mySQL
    * @param part:每个分区对象
    */
  def saveToMySQL(part: Iterator[(String, Int)]): Unit = {
    //申明驱动
    Class.forName("com.mysql.jdbc.Driver")
    //申明连接和语句
    var conn:Connection = null
    var pstm:PreparedStatement = null
    try{
      //构建连接
      conn = DriverManager.getConnection("jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
      "root","123456")
      //构建SQL
      val sql = "insert into db_test.tb_wordcount values(?,?)"
      //构建语句对象
      pstm = conn.prepareStatement(sql)
      //赋值
      part.foreach{
        case (word,numb) => {
          pstm.setString(1,word)
          pstm.setString(2,numb.toString)
          //添加到batch中
          pstm.addBatch()
        }
      }
      //批量提交给MySQL执行
      pstm.executeBatch()
    }catch {
      case e:Exception => e.printStackTrace()
    }finally {
      if (pstm != null) pstm.close()
      if (conn != null) conn.close()
    }
  }

Spark core 命令行提交任务

PARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master local[2] \
--class bigdata.itcast.cn.spark.core.wordcount.WordCount \
--driver-memory 512M \
--executor-memory 512M \
--executor-cores 1 \
--num-executors 1 \
--deploy-mode client \
hdfs://node1:8020/spark/apps/spark-wc.jar \   jar包
/example/wc.data \    输入
/output/wc              输出
=============================

--master:指定运行的模式
local[K]:使用本地模式,使用K个线程运行
spark://host:7077:使用Standalone模式
spark://host1:7077,host2:7077……:使用Standalone的HA模式yarn:提交到yarn上运行
===============================
--deploy-mode        Driver的运行部署模式
driver:这个选项决定了driver运行的位置
executor:运行在从节点上
--class:指定运行哪个类
--jars:添加额外需要用到的jar包

==============================
通用配置
--driver-memory MEM Memory for driver (e.g. 1000M,
2G) (Default: 1024M).
--executor-memory MEM Memory per executor (e.g. 1000M,
2G) (Default: 1G).
==============================================
Standalone 模式
--executor-cores:指定每个executor能用几核CPU
--total-executor-cores NUM :所有executor的总CPU核数
==============================================
yarn模式
--executor-cores:指定每个executor能用几核CPU
--num-executors NUM:指定启动几个executor

**

Sparksql 与结构化流 StructStreaming 外部数据源

**

**

# SparkConf配置

**
val conf = new SparkConf()
//以当前的类名作为程序的名称
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
//配置本地模式运行
.setMaster("local[2]")
//配置hdfs路径
.set("fs.defaultFS","hdfs://node1:8020")//设置hdfs namenode 位置
 // TODO: 设置使用Kryo 序列化方式
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  // TODO: 注册序列化的数据类型
   .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
//构建SparkContext对象
val sc = new SparkContext(conf)
//SparkSession 建造者模式--sparksql
val spark: SparkSession = SparkSession
.builder()//构建一个建造者对象
.appName(this.getClass.getSimpleName.stripSuffix("$"))//设置程序名称
.master("local[2]")//设置运行模式
.config("spark.sql.shuffle.partitions","2")  默认200个分区
// .config("","")//用于设置其他属性
.getOrCreate()

===========================================================
集成hive配置设置
//设置属性的值:配置Hivemetastore的地址
.config("hive.metastore.uris","thrift://node1.itcast.cn:9083")
//开启支持Hive
.enableHiveSupport()
=========================================
//**********与kafka集成******** 
//主要是结构化流 structStreaming  
//数据输入
// 消费多个Topic的数据
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092,node2:9092")
.option("subscribe", "topic1,topic2")     //可以用正则方式消费多个消费者 如: "topic.*"
.option("maxOffsetsPerTrigger","100000")//置流控:单次触发拉取最大偏移总量
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")  //将key和value转换为String类型
.as[(String, String)]
//数据输出  ---构建查询器
val query = rsStream
//保存数据流
.writeStream
.outputMode(OutputMode.Complete()) //指定查询输出的模式,
.format("console")//指定保存的模式
.queryName("wcquery")//指定查询器的名称
//配置检查点,存储程序的元数据信息,用于程序的恢复
.option("checkpointLocation", "datastruct/output/chk1")
.option("numRows","10") //打印多少行
.option("truncate","false") //是否省略显示
.start() //启动流式计算,构建查询器
//#####输出格式的补充说明
	//○ Append mode (default) :追加模式,以追加的方式显示结果
	//○ Complete mode:完全模式,对所有数据进行聚合输出结果
	//○ Update mode:更新模式,只输出更新的结果

==============================================================
 //Streamingcontext(SparkStreaming)
* 构建StreamingContext:两个参数
      *     第一个参数:SparkConf,用于构建SparkContext
      *     第二个参数:Duration:每个批次执行的时间,批次时间:Batch Interval
val ssc = new StreamingContext(conf,Duration)   //Duration 批次运行时间-Seconds(3)
====================与Kafka集成===========
 val locationStrategy = LocationStrategies.PreferConsistent

        val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test01",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //指定消费哪些Topic
    val topics = Array("sparkStream01")
    val consumerStrategy = ConsumerStrategies.Subscribe[String,String](
      topics,
      kafkaParams
    )
    //消费Kafka的数据
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      locationStrategy,
      consumerStrategy
    )
=============================================
var offsetRanges: Array[OffsetRange] = null

 //将RDD中的Offset取出
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//打印每个分区的offset
      offsetRanges.foreach(partRange => {
        val topic = partRange.topic
        val part = partRange.partition
        val startOffset = partRange.fromOffset
        val endOffset = partRange.untilOffset
        println(topic+"\t"+part+"\t"+startOffset+endOffset)
      })
===========================================================