欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Structured Streaming初试

程序员文章站 2022-05-22 09:02:15
...

本篇博客记录一下自己初次尝试使用Structured Streaming。

首先写 一个程序来监昕网络端口发来的内容,然后进行 WordCount 。

package main.learningbigdata.spark.spark3step

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * FileName: StructuredStreamingWordCount
  * Author:   hadoop
  * Email:    aaa@qq.com
  * Date:     19-6-1 下午3:34
  * Description:
  * Structured Streaming例子
  *
  */
object StructuredStreamingWordCount {

  def main(args: Array[String]): Unit = {

    //设置配置
    val conf = new SparkConf()
      .setAppName("StructuredStreamingWordCount")
      .setMaster("local[2]")

    /**
      * 第一步:创建程序入口SparkSession ,并引入spark.implicits._来允许 Scalaobject 隐式转换为DataFrame
      */

    val spark = SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._

    /**
      * 第二步:创建流。配置从 socket 读取流数据,地址和端口为 localhost: 9999。
      */
    val lines = spark.readStream.format("socket")
      .option("host","localhost")
      .option("port","9999")
      .load()
    /**
      * 第三步:进行单词统计。这里 lines 是 DataFrame ,使用 as[String]给它定义类型转换为
      * Dataset 。 之后在 Dataset 里进行单词统计。
      */
    val words = lines.as[String].flatMap(_.split(" "))

    val wordCount = words.groupBy("value").count()
    /**
      * 第四步:创建查询句柄,定义打印结果方式并启动程序 。 这里使用 write Stream 方法 , 输
      * 出模式为全部输出到控制台。
      */
    val query = wordCount.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()

    /**
      * 第五步:接下来运行该程序,在 Linux 命令窗口运行 nc-lk 9999 开启 9999 端口 。
      */
  }

}

为了方便就直接在IDE上运行,使用IDEA运行时,需要设置一下运行参数

run -->Edit Configurations...

Structured Streaming初试

接下来运行该程序,在 Linux 命令窗口运行 nc-lk 9999 开启 9999 端口。

Structured Streaming初试

在IDEA控制台上就能看到wordCount结果了。

Structured Streaming初试