欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Structured Streaming实时读取kafka数据案例

程序员文章站 2022-07-14 21:53:29
...

我们知道sparkstreaming官方已经停止了维护,从spark2.2开始全力打造Structured Streaming,下面我们来介绍Structured Streaming如何读取kafka中的数据。
Structured Streaming读取数据分为批处理和流处理:
package com.ky.service

import org.apache.log4j.lf5.LogLevel
import org.apache.spark.sql.{Dataset, SparkSession}

/**

  • @Author: xwj
  • @Date: 2019/1/31 0031 13:48
  • @Version 1.0
    */
    object KafkaStreaming {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel(LogLevel.ERROR.getLabel)

import spark.implicits._
val topic = "kafka"
val df = spark
  //read是批量读取,readStream是流读取,write是批量写,writeStream是流写入 关于startingoffsets "latest" for streaming, "earliest" for batch
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "192.168.1.10:6667,192.168.1.11:6667")
  .option("subscribe", topic) //topic可以订阅多个,消费具体分区用assign,消费topic用subscribe
  //      .option("startingoffsets", "earliest") 读具体偏移量,只支持批读取
  //      .option("endingoffsets", "latest")
  .load()
val kafkaDf: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
//判断是否为流处理
println(kafkaDf.isStreaming)
kafkaDf.printSchema()
val words = kafkaDf.flatMap(_._2.split(","))
val wordCounts = words.groupBy("value").count()
val query = wordCounts
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination()

}
}

结合sparksql的应用:
object KafkaStreaming2 {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel(LogLevel.ERROR.toString)
val topic = "kafka"
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.10:6667,192.168.1.11:6667")
.option("subscribe", topic)
.load()
val kafkaDf: Dataset[(String, String)] = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val value = kafkaDf.filter(_._2.split(",").length == 3)
val deviceDf: Dataset[DeviceData] = value.map(line => {
val arr = line._2.split(",")
DeviceData(arr(0), arr(1), arr(2).toDouble)
})
deviceDf.createOrReplaceTempView("test")
val frame = spark.sql("select * from test").where("signal>0.5")
//outputMode("complete")不可加
val query = frame.writeStream.format("console").start()
query.awaitTermination()
}

}

case class DeviceData(device: String, deviceType: String, signal: Double)

和传统方式进行对比:
object Test {

def main(args: Array[String]): Unit = {
//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName("SQL-2")
//SQLContext要依赖SparkContext
val sc = new SparkContext(conf)
//创建SQLContext
val sqlContext = new SQLContext(sc)
//从指定的地址创建RDD
val personRDD = sc.textFile(args(0)).map(_.split(" "))
//通过StructType直接指定每个字段的schema
val schema = StructType(
List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
)
)
//将RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
//将schema信息应用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//注册表
personDataFrame.createOrReplaceTempView("t_person")
//执行SQL
val df = sqlContext.sql("select * from t_person order by age desc limit 4")
//将结果以JSON的方式存储到指定位置
df.write.json(args(1))
//停止Spark Context
sc.stop()
}

}