spark数据流的合并与分支
程序员文章站
2022-06-03 18:16:22
...
spark数据流(data flow)的合并可以通过union来实现。
先测试一下批量数据(batching data)的union:
scala> Seq("1","2","3","4").toDS.union(Seq("a","b","c","d").toDS).show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| a|
| b|
| c|
| d|
+-----+
再来测试一下流数据(streaming data)的union:
val lines1 = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val lines2 = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load()
val words = lines3.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.outputMode("complete").format("console").start
query.awaitTermination()
分别启动netcat:
nc -lk 9999
nc -lk 8888
测试结果如下:
-------------------------------------------
Batch: 11
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| 9b| 2|
| 8888| 1|
| 8b| 2|
| 8c| 2|
| 9a| 2|
| 8a| 2|
| 9c| 2|
| 9999| 1|
+-----+-----+
再来研究分支,如下代码可以理解成分支:
val ds = Seq(1,2,3,4).toDS
val (ds1, ds2) = (ds.filter(_ % 2 == 0), ds.filter(_ % 2 == 1))
ds1.show
ds2.show
这个分支是个假的分支,ds其实被遍历了2次。。。
要实现真的分支,估计只能挂接多个Sink来实现了