第 16 节 DataStream之算子操作(scala语言)
程序员文章站
2022-03-14 19:09:02
...
上篇:第 15 节 DataStream之source(scala语言)
1、操作实现过滤打印为偶数的数据
filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
具体代码实现:
MyNoParallelSource.scala
package xuwei.streaming
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
/**
* 创建自定义并行度为1的source
*
* 实现从1开始产生递增数字
*/
class MyNoParallelSource extends SourceFunction[Long]{
var count=1L
var isRunning=true
override def run(ctx: SourceContext[Long]) = {
while(isRunning){
ctx.collect(count)
count+=1
Thread.sleep(1000)
}
}
override def cancel()= {
isRunning = false
}
}
StreamingDemoFilterSource.scala
package xuwei.streaming
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingDemoFilterSource {
def main(args: Array[String]): Unit = {
//获取flink的运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
val text = env.addSource(new MyNoParallelSource)
val mapData = text.map(line => {
println("原始接收到的数据:" + line)
line
}).filter(_ % 2 == 0)
val sum = mapData.map(line=>{
println("过滤之后的数据:"+line)
line
}).timeWindowAll(Time.seconds(2)).sum(0)
sum.print().setParallelism(1)
env.execute("StreamingFromCollectionScala")
}
}
控制台打印信息(不断打印下去):
2、Union的基本操作
Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。
具体代码实现:
MyNoParallelSource.scala
package xuwei.streaming
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
/**
* 创建自定义并行度为1的source
*
* 实现从1开始产生递增数字
*/
class MyNoParallelSource extends SourceFunction[Long]{
var count=1L
var isRunning=true
override def run(ctx: SourceContext[Long]) = {
while(isRunning){
ctx.collect(count)
count+=1
Thread.sleep(1000)
}
}
override def cancel()= {
isRunning = false
}
}
StreamingDemoUnionSource.scala
package xuwei.streaming
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingDemoUnionSource {
def main(args: Array[String]): Unit = {
//获取flink的运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
val text1 = env.addSource(new MyNoParallelSource)
val text2 = env.addSource(new MyNoParallelSource)
val untionll = text1.union(text2)
val sum= untionll.map(line =>{
println("接收到的数据:"+line)
line
}).timeWindowAll(Time.seconds(2)).sum(0)
sum.print().setParallelism(1)
env.execute("StreamingFromCollectionScala")
}
}
控制台打印信息(不断打印下去):
3、Connect基本操作
Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。
具体代码实现:
MyNoParallelSource.scala
package xuwei.streaming
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
/**
* 创建自定义并行度为1的source
*
* 实现从1开始产生递增数字
*/
class MyNoParallelSource extends SourceFunction[Long]{
var count=1L
var isRunning=true
override def run(ctx: SourceContext[Long]) = {
while(isRunning){
ctx.collect(count)
count+=1
Thread.sleep(1000)
}
}
override def cancel()= {
isRunning = false
}
}
StreamingDemoConnectSource.scala
package xuwei.streaming
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingDemoConnectSource {
def main(args: Array[String]): Unit = {
//获取flink的运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
val text1 = env.addSource(new MyNoParallelSource)
val text2 = env.addSource(new MyNoParallelSource)
val text2_str = text2.map("str" + _)
val connectStreaming = text1.connect(text2_str)
val result = connectStreaming.map(line1 => {line1}, line2 => {line2})
result.print().setParallelism(1)
env.execute("StreamingDemoConnectSource")
}
}
控制台打印信息(不断打印下去):
4、Split基本操作
Split:根据规则把一个数据流切分为多个流
具体代码实现:
MyNoParallelSource.scala
package xuwei.streaming
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
/**
* 创建自定义并行度为1的source
*
* 实现从1开始产生递增数字
*/
class MyNoParallelSource extends SourceFunction[Long]{
var count=1L
var isRunning=true
override def run(ctx: SourceContext[Long]) = {
while(isRunning){
ctx.collect(count)
count+=1
Thread.sleep(1000)
}
}
override def cancel()= {
isRunning = false
}
}
StreamingDemoSpiltSource.scala
package xuwei.streaming
import java.util
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object StreamingDemoSpiltSource {
def main(args: Array[String]): Unit = {
//获取flink的运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
val text= env.addSource(new MyNoParallelSource)
val spiltStream = text.split(new OutputSelector[Long] {
override def select(value: Long) = {
val list = new util.ArrayList[String]()
if (value % 2 == 0) {
list.add("even") //只会打印偶数
} else {
list.add("odd")
}
list
}
})
val evenStream = spiltStream.select("even")
evenStream.print().setParallelism(1)
env.execute("StreamingDemoConnectSource")
}
}
控制台打印信息(不断打印下去):
下一篇: 如何在css中设置图片阴影