欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

StructuredStreaming读取kafka结果输出到mysql

程序员文章站 2024-01-11 21:45:22
...

依赖:

  <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <mysql.version>5.1.38</mysql.version>
    </properties>
<dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Spark SQL 依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
         <!-- 集成kafka -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- MySQL -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

    </dependencies>

代码:
注意代码以及后续操作kafka需要替换成自己的ip地址

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}

object KafkaWC {
  def main(args: Array[String]): Unit = {
    //创建SparkSession实例对象
    val conf: SparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    //设置日记级别
    spark.sparkContext.setLogLevel("warn")
    //导入隐式转换和常用函数
    import org.apache.spark.sql.functions._
    import spark.implicits._
    //订阅拉取Kafka数据
    val df: DataFrame = spark
      .readStream
      .format("kafka")
      //ip换成自己kafka ip地址
      .option("kafka.bootstrap.servers", "ip:9092")
      .option("subscribe", "sparkTopic")
      .load()
    //对数据进行处理,提取kafka values进行切分统计wordcount
    val midStream: Dataset[String] = df.selectExpr("CAST(value AS STRING)")
      .as[String]
      .filter(trim($"value").isNotNull)
      .flatMap(line => line.split("\\s+"))

    val frame: DataFrame = midStream.select($"value" as ("word")).select($"word").groupBy("word").count()

    //将结果输出到mysql,foreach函数中需要继承ForeachWriter自定义输出逻辑
    val query: StreamingQuery = frame.coalesce(1).writeStream.outputMode(OutputMode.Complete())
      .foreach(
        new ForeachWriter[Row] {
          var con: Connection = _
          var ps: PreparedStatement = _

          //这里创建数据库链接
          override def open(partitionId: Long, version: Long): Boolean = {
            Class.forName("com.mysql.jdbc.Driver")
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root")
            return true
          }

          //数据真正处理的逻辑
          override def process(value: Row): Unit = {
            ps = con.prepareStatement("insert into test values(?,?) on duplicate key update cnt=values(cnt)")
            ps.setString(1, value.getAs[String]("word"));
            ps.setLong(2, value.getAs[Long]("count"))
            ps.execute()
          }

          //碰到异常的处理策略(关流)
          override def close(errorOrNull: Throwable): Unit = {
            if (ps != null) {
              try {
                ps.close()
              } catch {
                case e: Exception => e.printStackTrace()
              } finally {}
            }

            if (con != null) {
              try {
                con.close()
              } catch {
                case e: Exception => e.printStackTrace()
              } finally {}
            }
          }

        }).start()
    //    将结果数据操控制台
    //    val query: StreamingQuery = frame
    //      .coalesce(1)
    //      .writeStream.outputMode(OutputMode.Update())
    //      .format("console").start()
    query.awaitTermination()
  }
}

创建数据库表
CREATE TABLEtest(wordvarchar(255) NOT NULL,cntbigint(11) NOT NULL AUTO_INCREMENT, PRIMARY KEY (word), KEYid(cnt) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

启动kafka
创建topic:

./kafka-topics.sh --create  --zookeeper ip:2181 --topic sparkTopic 
--partitions 3 --replication-factor 2

通过命令行启动sparkTopic

./kafka-console-producer.sh --broker-list ip:9092 --topic sparkTopic

启动spark应用,通过命令行生产kafka消息:
StructuredStreaming读取kafka结果输出到mysql
查看mysql表
StructuredStreaming读取kafka结果输出到mysql
数据已经进去,再次输入qianqi lisi
StructuredStreaming读取kafka结果输出到mysql
发现mysql已经更新。

下面说一下outputMode(OutputMode.Complete())中append,complete,update的区别,熟悉streaming的人知道,状态统计updateStateByKey跟mapWithState的区别就是complate跟update的区别:complate是只要有数据更新,就会把所有的数据全部更新,而update只会更新有更新的数据。
下面截断一下mysql表,此时test表中没有数据,我们在kafka中只输入zhangsan一个单词会发现

StructuredStreaming读取kafka结果输出到mysql
清空表后,只输入一个zhangsan,但是其他的数据又进到表中了,说明清空了test表,输入zhangsan后,其他的数据没有更新的数据也是跟zhangsan一个批次再次进入了test表。
清空表并更改输出模式.writeStream.outputMode(OutputMode.Update())为update,先生产zhangsan wangwu lisi 等待数据进入test表中之后,再次清空表后,kafka输入zhangsan ,此时mysql

StructuredStreaming读取kafka结果输出到mysql
发现,只有zhangsan更新了,其他数据没有更新。
最后将模式改成appen:.outputMode(OutputMode.Append())
启动后报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Repartition 1, false

append模式不支持聚合(简单操作每条数据之后直接输出)
总结:append不能有聚合操作,update只会更新有更新的数据,complete会更新全量数据。

相关标签: spark 大数据