Flink DataStream API 介绍
DataStream 编程模型
- DataSource模块负责数据接入
- 内置数据源:文件数据源readTextFile/readFile(InputFormat),Socket端口socketTextStream,集合数据源 fromElements
- 第三方数据源:
- 仅支持读取:Netty
- 仅支持输出:ElasticSearch,HDFS
- 支持读取和输出:Kafaka,RabbitMQ
- 用户自定义数据源连接器
- Transformation模块负责数据集的各种转换操作
- 单SingleDataStream:对单个DataStream的处理逻辑
- Map[DataStream->DataStream],map函数可以直接传入匿名计算表达式,也可以实现匿名的MapFunction类,覆盖其中的map方法。
- FlatMap[DataStream->DataStream]
- Filter[DataStream->DataStream],符合表达式的留下来,不符合的过滤掉。
- KeyBy[DataStream->KeyedStream],执行Partition操作,将相同的Key值放入相同的分区。
- Reduce[KeyedStream->DataStream],可直接传入匿名的reduce函数,也可实现匿名的ReduceFunction类。
- Aggregations[KeyedStream->DataStream], 根据指定的字段进行聚合操作,滚动计算参数聚合结果,即一个reduce操作个例。例如 sum,min,minBy(返回最小值对应的元素),max,maxBy
- MultiDataStream:对多个DataStream的处理逻辑
- Union[DataStream->DataStream],需要保证两个数据集的格式一致。
- Connect,CoMap,CoFlatMap[DataStream->DataStream]
- Split[DataStream->SplitStream]
- Select[SplitStream->DataStream]
- Iterate[DataStream->IterativeStream->DataStream]
- 物理分区:并行度和数据分区的调整转换
- shuffle:随机重新分区,分区相对平衡,但是容易失去原有的数据分区结构。
- rebalance:通过循环的方式对数据集中的数据进行重分区,尽可能地保证每个分区的数据平衡。
- rescale:循环处理,并仅会对上下游的算子数据进行重新平衡。
- broadcast:将数据集复制到下游的算子task中,防止网络拉取。
- 自定义分区 extends Partitioner
- 单SingleDataStream:对单个DataStream的处理逻辑
- DataSink模块负责写出到外部存储
- 基本数据输出:writeAsCsv,writeAsText,writeToSocket
- 第三方数据输出:FlinkKafkaProducer
时间概念与Watermark
时间的概念
-
事件生成时间 Event Time,时间实际发生时间
-
事件接入时间 Ingestion Time,传送至DataSource时的机器时间
-
事件处理时间 Processing Time,到达某个算子操作时获取的机器时间
在Flink中默认情况下使用的是Process Time,因此要使用其他两种时间时,需要显示指定。
//指定时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
EventTime和Watermark
Watermark是水位线的意思,一个基于事件时间的Window是水槽,Watermark用于判断该水槽中的水是否全部流入了,是一个表达数据完整性的统计量。
Watermark=EventTime-最大延迟到达时间
当窗口结束时间大于Watermark,并且该窗口中有事件数据时,就会触发窗口计算。
Timestamps Assigning与生成Watermarks
使用EventTime需要说明如何从事件数据中提取Timestamp,该过程叫做Timestams Assigning。
得到EventTime之后,需要制定Watermarks的生成策略。
两种方式:
-
在DataStream Source 算子接口的Source Function 中定义。
-
通过Flink自带的Timestamp Assigner 指定Timestamp 和生成Watermark
-
根据时间间隔周期性生成Watermark,AssignerWithPeriodicWatermarks
-
升序模式,适用于顺序生成事件:assignAscendingTimestamps
-
固定时间延迟间隔
-
assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdsStateRealEntity](Time.minutes(1)) // 指定时间间隔为1分钟 { // 指定事件时间抽取方式 override def extractTimestamp(element: AdsStateRealEntity): Long = getUnixTime(element.getStatusStartTime) })
- 根据特定元素的数据满足某个特定条件生成Watermark,AssignerWithPunctuatedWatermarks
-
-
自定义Timestamp Assinger 和 WartermarkGenerator。
-
Periodic Watermarks 自定义生成
-
Punctuated Watermarks 自定义生成
-
窗口计算
WindowsAssigner :指定窗口的类型,定义如何将数据流分配到一个或者多个窗口。
Windows Trigger:指定窗口触发时机,定义窗口满足什么样的条件触发计算。
Evictor:用于数据剔除。
Lateness:标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算。
OutPut Tag:标记输出标签,然后通过getSideOutput将窗口中的数据根据标签输出。
Windows Function:定义窗口上数据处理的逻辑,例如对数据进行sum操作。
stream.keyBy( ... ) // window函数窗口操作必须是keyedStream,否则应该调用windowAll方法
.window( ... ) // 指定窗口分配器类型
[.trigger( ... )]// 指定触发器类型(可选)
[.evictor( ... )]// 指定evictor(可选)
[.allowedLateness( ... )] // 指定是否延迟处理数据(可选)
[.sideOutputLateData( ... )] // 指定Output Tag (可选)
.reduce/aggregate/fold/apply() // 指定窗口计算函数
[.getSideOutput( ... )] // 根据Tag输出数据(可选)
Windows Assigner
基于时间的窗口
TimeWindow 类来获取窗口的起始时间和终止时间,以及该窗口允许进入的最新时间戳信息等元数据。
(1)滚动窗口:根据固定时间或者大小进行切分,且窗口和窗口之间的元素互不重叠。
TumblingEventTimeWindows 和 TumblingProcessTimeWindows 或者 timeWindow方法。
(2)滑动窗口:在滚动窗口的基础之上,增加了窗口的滑动时间,并且允许数据重叠。窗口的大小,和滑动时间两个参数确定了滑动的策略。当Slide time 小于 Windows size 是便会发生重叠。而当Slide time 大于 Windows 四则时则会发生不连续。
(3)会话窗口:会话窗口(Session Windows)主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发条件是Session Gap,是指在规定的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算。可以通过实现SessionWindowTimeGapExtractor接口,复写extract方法动态定义session gap。
(4)全局窗口:全局窗口将所有相同的Key的数据分配到单个窗口中计算结果,窗口没有起始时间和结束时间,窗口需要借助Trigger来触发计算,如果不对Global Windows 指定Trigger,窗口是不会触发计算的
基于数量的窗口
countWindows()来定义基于数量的窗口。
Windows Function
- 增量聚合函数
- ReduceFunction
- AggregateFunction
- FoldFunction
- 全量窗口函数
- ProcessWindowFunction
Trigger 窗口触发器
EventTimeTrigger:通过对比Wartermark 和窗口的EndTime 确定是否触发窗口,如果Wartermark的时间大于Windows EndTime 则触发计算,否则窗口继续等待。
ProcessTimeTrigger:和EventTimeTrigger类似,只是使用ProcessTime确定。
ContinuousEventTimeTrigger:根据时间间隔周期性触发窗口或者Window的结束时间小于当前EventTime触发窗口计算。
ContinuousProcessingTimeTrigger:同上
CountTrigger:根据接入数据量是否超过设定的阈值确定是否触发窗口计算。
DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold来判断是否触发窗口计算。
PurgingTrigger:可将将任意触发器作为参数转换为Purge类型触发器,计算完成后数据将被清理。
另外,用户可以自定义Trigger。
Evictors 数据剔除器
数据剔除器的主要作用是对进入WindowFunction前后的数据进行剔除处理。
- CountEvictor:保持在窗口中具有固定数量的记录,将超过指定大小的数据在窗口计算前剔除。
- DeltaEvictor:通过定义DeltaFunction 和 指定 threshold,并计算Windows中元素与最新元素之间的Delta大小,如果超过threshold则将当前数据元素剔除。
- TimeEvictor:通过指定时间间隔,将当前窗口中最新元素的时间减去Interval,然后将小于该结果的数据全部剔除,其本质是将具有最新时间的数据选择出来,删除过时数据。
- 用户可自定义Evictor
延迟数据处理
基于Event-Time的窗口处理流式数据,在延迟非常高的情况下,依然会有部分数据未来得及进入应该进的窗口,Flink默认情况下是将这些数据丢弃,而Allowed Lateness机制对迟到的数据进行额外的处理。
对于指定了allowedLateness的窗口,计算过程中的Window的Endtime会加上该时间,作为窗口最后被释放的结束时间(P),如果EventTime超过了P则丢弃数据,如果EventTime未超过P,但是Wartermark已经超过Endtime则触发计算,对于未参与计算的数据,可以通过sideOutputLateData方法打上late-data标签,然后通过getSideOutput方法获取该标签的数据,进行额外处理。
窗口计算
独立窗口计算:针对同一个DataStream进行不同的窗口处理,窗口之间相对独立,输出结果在不同的DataStream中,这时在Flink Runtime 执行环境中,将分为两个Window Operator 在不同的Task中执行,相互之间的元数据无法共享。
连续窗口计算:连续窗口计算表示上游窗口计算的结果是下游窗口计算的输入,窗口算子和算子之间是上下游关联关系,窗口之间的元数据信息能够共享。例如求每个key的最小值中最大的n个key。
Windows 多流合并 Join操作
在Windows Join 过程中所有的Join操作都是Inner-join类型,两个关联的窗口必须是同类型的。
滚动窗口关联(Tumbling Window join)
滑动窗口关联(Sliding Window join)
会话窗口关联(Session Window join)
间隔窗口关联(Interval Join)
作业链和资源组
在Flink作业中,用户可以指定相应的链条,将相关性非常强的转换操作绑定在一起,这样能够让转换过程上下游的Task在同一个Pipeline中执行。
上一篇: flink on yarn模式
推荐阅读
-
HTML5全屏(Fullscreen)API详细介绍
-
javascript 内置对象及常见API详细介绍
-
基于HTML5 FileSystem API的使用介绍
-
js对象Object常用的Api介绍
-
HTML5 window/iframe跨域传递消息 API介绍
-
Web API---part2课程介绍+part1复习
-
HTML5全屏(Fullscreen)API详细介绍
-
Web API---课程介绍 + JavaScript分三个部分
-
Flink实战(六) - Table API & SQL编程
-
idea中flink启动报错org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters