Structured Streaming初试
程序员文章站
2022-05-22 09:02:15
...
本篇博客记录一下自己初次尝试使用Structured Streaming。
首先写 一个程序来监昕网络端口发来的内容,然后进行 WordCount 。
package main.learningbigdata.spark.spark3step import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession /** * FileName: StructuredStreamingWordCount * Author: hadoop * Email: aaa@qq.com * Date: 19-6-1 下午3:34 * Description: * Structured Streaming例子 * */ object StructuredStreamingWordCount { def main(args: Array[String]): Unit = { //设置配置 val conf = new SparkConf() .setAppName("StructuredStreamingWordCount") .setMaster("local[2]") /** * 第一步:创建程序入口SparkSession ,并引入spark.implicits._来允许 Scalaobject 隐式转换为DataFrame */ val spark = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ /** * 第二步:创建流。配置从 socket 读取流数据,地址和端口为 localhost: 9999。 */ val lines = spark.readStream.format("socket") .option("host","localhost") .option("port","9999") .load() /** * 第三步:进行单词统计。这里 lines 是 DataFrame ,使用 as[String]给它定义类型转换为 * Dataset 。 之后在 Dataset 里进行单词统计。 */ val words = lines.as[String].flatMap(_.split(" ")) val wordCount = words.groupBy("value").count() /** * 第四步:创建查询句柄,定义打印结果方式并启动程序 。 这里使用 write Stream 方法 , 输 * 出模式为全部输出到控制台。 */ val query = wordCount.writeStream.outputMode("complete").format("console").start() query.awaitTermination() /** * 第五步:接下来运行该程序,在 Linux 命令窗口运行 nc-lk 9999 开启 9999 端口 。 */ } }
为了方便就直接在IDE上运行,使用IDEA运行时,需要设置一下运行参数
run -->Edit Configurations...
接下来运行该程序,在 Linux 命令窗口运行 nc-lk 9999 开启 9999 端口。
在IDEA控制台上就能看到wordCount结果了。
推荐阅读
-
IE9下html5初试小刀
-
html5 初试 indexedDB(推荐)
-
用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试
-
nginx 0.8.54/1.0.0 在cygwin环境下的编译(包括 nginx_mod_h264_streaming-2.2.7)
-
nginx 0.8.54/1.0.0 在cygwin环境下的编译(包括 nginx_mod_h264_streaming-2.2.7)
-
录音精灵Apowersoft Streaming Audio Recorder安装及激活图文教程(附注册码)
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(一)
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(二)
-
使用IIS Live Smooth Streaming技术搭建流媒体直播系统
-
OCZ脑电波控制系统开箱初试