StructuredStreaming读取kafka结果输出到mysql
依赖:
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<mysql.version>5.1.38</mysql.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 集成kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
</dependencies>
代码:
注意代码以及后续操作kafka需要替换成自己的ip地址
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
object KafkaWC {
def main(args: Array[String]): Unit = {
//创建SparkSession实例对象
val conf: SparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//设置日记级别
spark.sparkContext.setLogLevel("warn")
//导入隐式转换和常用函数
import org.apache.spark.sql.functions._
import spark.implicits._
//订阅拉取Kafka数据
val df: DataFrame = spark
.readStream
.format("kafka")
//ip换成自己kafka ip地址
.option("kafka.bootstrap.servers", "ip:9092")
.option("subscribe", "sparkTopic")
.load()
//对数据进行处理,提取kafka values进行切分统计wordcount
val midStream: Dataset[String] = df.selectExpr("CAST(value AS STRING)")
.as[String]
.filter(trim($"value").isNotNull)
.flatMap(line => line.split("\\s+"))
val frame: DataFrame = midStream.select($"value" as ("word")).select($"word").groupBy("word").count()
//将结果输出到mysql,foreach函数中需要继承ForeachWriter自定义输出逻辑
val query: StreamingQuery = frame.coalesce(1).writeStream.outputMode(OutputMode.Complete())
.foreach(
new ForeachWriter[Row] {
var con: Connection = _
var ps: PreparedStatement = _
//这里创建数据库链接
override def open(partitionId: Long, version: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root")
return true
}
//数据真正处理的逻辑
override def process(value: Row): Unit = {
ps = con.prepareStatement("insert into test values(?,?) on duplicate key update cnt=values(cnt)")
ps.setString(1, value.getAs[String]("word"));
ps.setLong(2, value.getAs[Long]("count"))
ps.execute()
}
//碰到异常的处理策略(关流)
override def close(errorOrNull: Throwable): Unit = {
if (ps != null) {
try {
ps.close()
} catch {
case e: Exception => e.printStackTrace()
} finally {}
}
if (con != null) {
try {
con.close()
} catch {
case e: Exception => e.printStackTrace()
} finally {}
}
}
}).start()
// 将结果数据操控制台
// val query: StreamingQuery = frame
// .coalesce(1)
// .writeStream.outputMode(OutputMode.Update())
// .format("console").start()
query.awaitTermination()
}
}
创建数据库表CREATE TABLE
test(
wordvarchar(255) NOT NULL,
cntbigint(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (
word), KEY
id(
cnt) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
启动kafka
创建topic:
./kafka-topics.sh --create --zookeeper ip:2181 --topic sparkTopic
--partitions 3 --replication-factor 2
通过命令行启动sparkTopic
./kafka-console-producer.sh --broker-list ip:9092 --topic sparkTopic
启动spark应用,通过命令行生产kafka消息:
查看mysql表
数据已经进去,再次输入qianqi lisi
发现mysql已经更新。
下面说一下outputMode(OutputMode.Complete())中append,complete,update的区别,熟悉streaming的人知道,状态统计updateStateByKey跟mapWithState的区别就是complate跟update的区别:complate是只要有数据更新,就会把所有的数据全部更新,而update只会更新有更新的数据。
下面截断一下mysql表,此时test表中没有数据,我们在kafka中只输入zhangsan一个单词会发现
清空表后,只输入一个zhangsan,但是其他的数据又进到表中了,说明清空了test表,输入zhangsan后,其他的数据没有更新的数据也是跟zhangsan一个批次再次进入了test表。
清空表并更改输出模式.writeStream.outputMode(OutputMode.Update())为update,先生产zhangsan wangwu lisi 等待数据进入test表中之后,再次清空表后,kafka输入zhangsan ,此时mysql
发现,只有zhangsan更新了,其他数据没有更新。
最后将模式改成appen:.outputMode(OutputMode.Append())
启动后报错:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Repartition 1, false
append模式不支持聚合(简单操作每条数据之后直接输出)
总结:append不能有聚合操作,update只会更新有更新的数据,complete会更新全量数据。