Apache Flink(五):Flink Operator
Operator(操作符)
DataStream Transformations 数据流转换
Datastream -> Datasteam
Map(映射)
取一个元素并产生一个元素,是一个映射函数。下面的例子为使用Map将输入流的值加倍
dataStream.map { x => x * 2 }
FlatMap(展开)
取一个元素并产生零个,一个或多个元素。FlatMap可将英文短语拆分为单词
dataStream.flatMap { str => str.split(" ") }
Filter(过滤)
为每个元素评估一个布尔函数,并保留该函数返回true的布尔函数。 过滤出零值的过滤器,即布尔函数的返回值为true则保留
dataStream.filter { _ != 0 }
Union(联合)
两个或多个数据流的并集,创建一个包含所有流中所有元素的新流。 注意:如果将数据流与其自身合并,则在结果流中每个元素将获得两次。
dataStream.union(otherStream1, otherStream2, ...)
DataStream,DataStream → ConnectedStreams
Connect(连接)
Connect两个保留其类型的数据流,从而允许两个流之间共享状态。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
类似于ConnectedStreams上的map和flatMap
connectedStreams.map(
(_ : Int) => true,
(_ : String) => false
)
connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
案例代码
import org.apache.flink.streaming.api.scala._
object FlinkConnectStream {
def main(args: Array[String]): Unit = {
// 1.创建StreamExecutionEnvironment
val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 2.创建DataStream
val ds1 : DataStream[String] = flinkEnv.fromCollection(List("Hello Flink", "Hello Spark", "Hello Scala"))
val ds2 : DataStream[Int] = flinkEnv.fromCollection(List(2,3,5,7))
val ds : ConnectedStreams[String, Int] = ds1.connect(ds2)
ds.flatMap(
(line:String) => line.split("\\s+"),
(line:Int) => line.toString
).print()
// 执行计算
flinkEnv.execute("FlinkConnectStream")
}
}
DataStream → SplitStream
Split(分割)
根据某个标准将流分成两个或多个流
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
Select(选择)
从拆分流中选择一个或多个流
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
案例代码(Split方式已过时)
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val logStream = fsEnv.socketTextStream("Spark",9999)
val splitStream: SplitStream[String] = logStream.split(new OutputSelector[String] {
override def select(out: String): lang.Iterable[String] = {
if (out.startsWith("INFO")) {
val array = new util.ArrayList[String]()
array.add("info")
return array
} else {
val array = new util.ArrayList[String]()
array.add("error")
return array
}
}
})
splitStream.select("info").print("info")
splitStream.select("error").printToErr("error")
fsEnv.execute("ConnectedStream")
用法二(优先)
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object FlinkSplitStream {
def main(args: Array[String]): Unit = {
val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment
val logStream = flinkEnv.socketTextStream("Spark", 6666)
val opt = new OutputTag[String]("error")
val dataStream = logStream.process(new ProcessFunction[String, String] {
override def processElement(value: String, ctx: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
// ERROR开头,为错误信息
if (value.startsWith("ERROR")) {
ctx.output(opt, value)
} else {
out.collect(value)
}
}
})
// 打标记
dataStream.print("正常信息")
dataStream.getSideOutput(opt).printToErr("错误信息")
flinkEnv.execute("FlinkSplitStream")
}
}
DataStream → KeyedDataStream
KeyBy(按键分组)
在逻辑上将流划分为不相交的分区,每个分区都包含同一键的元素。 在内部,这是通过哈希算法分区实现的。
dataStream.keyBy("someKey") // 通过指定的someKey进行分区
dataStream.keyBy(0) // 通过元组的第一个元素进行分区
Reduce(无初始值的滚动聚合)
对 KeyedDataStream进行滚动聚合。 将当前元素与最后一个Reduce的值合并出新值
dataStream
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(0)
.reduce((t1, t2) => (t1._1, t1._2 + t2._2))
.print()
Fold(有初始值的滚动聚合)
带有初始值的 KeyedDataStream的滚动折叠。 将当前元素与上一个Fold的值合并出新值
val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
Aggregations(滚动聚合)
在KeyedDataStream上滚动聚合。 min和minBy之间的区别是min返回最小值,而minBy返回在此字段中具有最小值的元素(与max和maxBy相同)。
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Physical partitioning 物理分区
Flink提供了一些分区方案,可供用户选择,分区目的是为了任务之间数据的能够均衡分布。
分区方案 | 说明 | 实现方式 |
---|---|---|
Custom partitioning | 需要用户实现分区策略 | dataStream.partitionCustom(partitioner, “someKey”) |
Random partitioning | 将当前的数据随机分配给下游任务 | dataStream.shuffle() |
Rebalancing (Round-robin partitioning) | 轮询将上游的数据均分下游任务 | dataStream.rebalance() |
Rescaling | 缩放分区数据,例如上游2个并行度/下游4个 ,上游会将1个分区的数据发送给下游前两个分区,后1个分区,会发送下游后两个 | dataStream.rescale() |
Broadcasting | 上游会将分区所有数据,广播给下游的所有任务分区 | dataStream.broadcast() |
Task chaining and resource groups 任务链和资源组
连接两个Operator 转换,尝试将两个Operator 转换放置到一个线程当中,可以减少线程消耗,避免不必要的线程通信。用户可以通过 StreamExecutionEnvironment.disableOperatorChaining()来禁用chain操作。
为了方便,Flink提供如下的算子用于修改chain的行为
算子 | 操作 | 说明 |
---|---|---|
Start new chain | someStream.filter(…).map(…).startNewChain().map(…) | 开启新chain,将当前算子和filter断开 |
Disable chaining | someStream.map(…).disableChaining() | 当前算子和前后都要断开chain操作 |
Set slot sharing group | someStream.filter(…).slotSharingGroup(“name”) | 设置操作任务所属资源Group,影响任务对TaskSlots占用 |
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream -细化
val dataStream: DataStream[String] = fsEnv.socketTextStream("Spark",9999)
//3.对数据做转换
dataStream.filter(line => line.startsWith("INFO"))
.flatMap(_.split("\\s+"))
.startNewChain()
.slotSharingGroup("g1")
.map((_,1))
.map(t=>WordPair(t._1,t._2))
.print()
fsEnv.execute("FlinkWordCountsQuickStart")
上一篇: c# wpf模拟按钮点击操作
推荐阅读
-
Apache Flink 1.9 重大特性提前解读
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(一)
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(二)
-
idea中flink启动报错org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters
-
「漏洞预警」Apache Flink 任意 Jar 包上传导致远程代码执行漏洞复现
-
Apache Flink 分布式运行时环境
-
Flink入门(一)——Apache Flink介绍
-
Flink入门(五)——DataSet Api编程指南
-
Apache Flink 未授权访问+远程代码执行
-
深入了解 Flink 网络栈(二):监控、指标和处理背压 工作网络协议jvm活动apache