Spark外部数据源部分总结
程序员文章站
2024-01-30 15:30:10
...
写Hbase
//构建Hbase的配置对象
val configuration = HBaseConfiguration.create()
// 设置连接Zookeeper属性
configuration.set("hbase.zookeeper.quorum", "node1")
configuration.set("hbase.zookeeper.property.clientPort", "2181")
configuration.set("zookeeper.znode.parent", "/hbase")
// 设置将数据保存的HBase表的名称
configuration.set(TableOutputFormat.OUTPUT_TABLE,"htb_wordcount")
==============================================================
//将rsRDD转换为需要的KV类型对象
val putRdd: RDD[(ImmutableBytesWritable, Put)] = rsRdd
.map(tuple => {
val word = tuple._1
//基于单词构建rowkey对象
val key = new ImmutableBytesWritable(Bytes.toBytes(word))
//构建V的Put类型对象
val value = new Put(Bytes.toBytes(word))
//添加列
value.addColumn(
Bytes.toBytes("info"),
Bytes.toBytes("cnt"),
Bytes.toBytes(tuple._2.toString)
)
//返回KV
(key,value)
})
//写入Hbase
putRdd.saveAsNewAPIHadoopFile(
"datas/output/hbase",
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[TableOutputFormat[ImmutableBytesWritable]],
configuration)
===================================
// 设置读取哪张Hbase的表
configuration.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
val hbaseRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
configuration,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
//todo:2-处理数据
//取出Value
val dataRdd = hbaseRdd.map(tuple => tuple._2)
//迭代取出每个rowkey
dataRdd
.take(3)
.foreach(rs => {
//将每个result中rowkey的所有列进行打印
val cells: Array[Cell] = rs.rawCells()
//取出每一列
cells.foreach(cell => {
//rowkey
val rowkey = Bytes.toString(CellUtil.cloneRow(cell))
val family = Bytes.toString(CellUtil.cloneFamily(cell))
val column = Bytes.toString(CellUtil.cloneQualifier(cell))
val value = Bytes.toString(CellUtil.cloneValue(cell))
//println
println(rowkey+"\t"+family+"\t"+column+"\t"+value)
写mysql
import java.sql.{Connection, DriverManager, PreparedStatement}
/**
* @ClassName WordCount
* @Description TODO 实现自定义开发Wordcount程序
* */
object WordCountWriteToMySQL {
/**
* 用于写入mySQL
* @param part:每个分区对象
*/
def saveToMySQL(part: Iterator[(String, Int)]): Unit = {
//申明驱动
Class.forName("com.mysql.jdbc.Driver")
//申明连接和语句
var conn:Connection = null
var pstm:PreparedStatement = null
try{
//构建连接
conn = DriverManager.getConnection("jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
"root","123456")
//构建SQL
val sql = "insert into db_test.tb_wordcount values(?,?)"
//构建语句对象
pstm = conn.prepareStatement(sql)
//赋值
part.foreach{
case (word,numb) => {
pstm.setString(1,word)
pstm.setString(2,numb.toString)
//添加到batch中
pstm.addBatch()
}
}
//批量提交给MySQL执行
pstm.executeBatch()
}catch {
case e:Exception => e.printStackTrace()
}finally {
if (pstm != null) pstm.close()
if (conn != null) conn.close()
}
}
Spark core 命令行提交任务
PARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master local[2] \
--class bigdata.itcast.cn.spark.core.wordcount.WordCount \
--driver-memory 512M \
--executor-memory 512M \
--executor-cores 1 \
--num-executors 1 \
--deploy-mode client \
hdfs://node1:8020/spark/apps/spark-wc.jar \ jar包
/example/wc.data \ 输入
/output/wc 输出
=============================
--master:指定运行的模式
local[K]:使用本地模式,使用K个线程运行
spark://host:7077:使用Standalone模式
spark://host1:7077,host2:7077……:使用Standalone的HA模式yarn:提交到yarn上运行
===============================
--deploy-mode Driver的运行部署模式
driver:这个选项决定了driver运行的位置
executor:运行在从节点上
--class:指定运行哪个类
--jars:添加额外需要用到的jar包
==============================
通用配置
--driver-memory MEM Memory for driver (e.g. 1000M,
2G) (Default: 1024M).
--executor-memory MEM Memory per executor (e.g. 1000M,
2G) (Default: 1G).
==============================================
Standalone 模式
--executor-cores:指定每个executor能用几核CPU
--total-executor-cores NUM :所有executor的总CPU核数
==============================================
yarn模式
--executor-cores:指定每个executor能用几核CPU
--num-executors NUM:指定启动几个executor
**
Sparksql 与结构化流 StructStreaming 外部数据源
**
**
# SparkConf配置
**
val conf = new SparkConf()
//以当前的类名作为程序的名称
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
//配置本地模式运行
.setMaster("local[2]")
//配置hdfs路径
.set("fs.defaultFS","hdfs://node1:8020")//设置hdfs namenode 位置
// TODO: 设置使用Kryo 序列化方式
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// TODO: 注册序列化的数据类型
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
//构建SparkContext对象
val sc = new SparkContext(conf)
//SparkSession 建造者模式--sparksql
val spark: SparkSession = SparkSession
.builder()//构建一个建造者对象
.appName(this.getClass.getSimpleName.stripSuffix("$"))//设置程序名称
.master("local[2]")//设置运行模式
.config("spark.sql.shuffle.partitions","2") 默认200个分区
// .config("","")//用于设置其他属性
.getOrCreate()
===========================================================
集成hive配置设置
//设置属性的值:配置Hivemetastore的地址
.config("hive.metastore.uris","thrift://node1.itcast.cn:9083")
//开启支持Hive
.enableHiveSupport()
=========================================
//**********与kafka集成********
//主要是结构化流 structStreaming
//数据输入
// 消费多个Topic的数据
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092,node2:9092")
.option("subscribe", "topic1,topic2") //可以用正则方式消费多个消费者 如: "topic.*"
.option("maxOffsetsPerTrigger","100000")//置流控:单次触发拉取最大偏移总量
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") //将key和value转换为String类型
.as[(String, String)]
//数据输出 ---构建查询器
val query = rsStream
//保存数据流
.writeStream
.outputMode(OutputMode.Complete()) //指定查询输出的模式,
.format("console")//指定保存的模式
.queryName("wcquery")//指定查询器的名称
//配置检查点,存储程序的元数据信息,用于程序的恢复
.option("checkpointLocation", "datastruct/output/chk1")
.option("numRows","10") //打印多少行
.option("truncate","false") //是否省略显示
.start() //启动流式计算,构建查询器
//#####输出格式的补充说明
//○ Append mode (default) :追加模式,以追加的方式显示结果
//○ Complete mode:完全模式,对所有数据进行聚合输出结果
//○ Update mode:更新模式,只输出更新的结果
==============================================================
//Streamingcontext(SparkStreaming)
* 构建StreamingContext:两个参数
* 第一个参数:SparkConf,用于构建SparkContext
* 第二个参数:Duration:每个批次执行的时间,批次时间:Batch Interval
val ssc = new StreamingContext(conf,Duration) //Duration 批次运行时间-Seconds(3)
====================与Kafka集成===========
val locationStrategy = LocationStrategies.PreferConsistent
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test01",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//指定消费哪些Topic
val topics = Array("sparkStream01")
val consumerStrategy = ConsumerStrategies.Subscribe[String,String](
topics,
kafkaParams
)
//消费Kafka的数据
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
locationStrategy,
consumerStrategy
)
=============================================
var offsetRanges: Array[OffsetRange] = null
//将RDD中的Offset取出
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//打印每个分区的offset
offsetRanges.foreach(partRange => {
val topic = partRange.topic
val part = partRange.partition
val startOffset = partRange.fromOffset
val endOffset = partRange.untilOffset
println(topic+"\t"+part+"\t"+startOffset+endOffset)
})
===========================================================