欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink Window Join机制

程序员文章站 2024-03-21 17:11:10
...

Flink  Window Join机制

window join连接两个流的元素,它们共享一个公共key并位于同一个窗口中。可以使用flink window定义这些窗口,并对来自这两个流的元素求值。

然后将两边的元素传递给用户定义的JoinFunction或FlatJoinFunction,用户可以在其中发出满足连接条件的结果。

参考Flink Stream Joining,一般用法概括如下:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

语义注意事项:

  • 创建两个流元素的成对组合的行为类似于内连接,如果来自一个流的元素与另一个流没有相对应要连接的元素,则不会发出该元素。
  • 结合在一起的那些元素将其时间戳设置为位于各自窗口中的最大时间戳。例如,以[5,10]为边界的窗口将产生连接的元素的时间戳为9。

下面,我们将根据一些示例场景,来讲述不同类型的窗口连接的行为。

翻滚窗口Join(Tumbling Window Join)

执行滚动窗口连接(Tumbling Window Join)时,具有公共Key和公共tumbling window的所有元素都以成对组合的形式进行连接,并传递给JoinFunctionFlatJoinFunction。因为这就像一个内连接,在滚动窗口中没有来自另一个流的元素的流的元素不会被输出!

Flink Window Join机制

如图所示,我们定义了一个大小为2毫秒的滚动窗口,其结果为[0,1],[2,3], ...。该图像显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。注意,在翻滚窗口[6,7]中没有发出任何内容,因为在绿色流中没有元素与橙色元素⑥、⑦连接。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply { (e1, e2) => e1 + "," + e2 }

滑动窗口Join(Sliding Window Join)

在执行滑动窗口连接(Sliding Window Join)时,具有公共Key和公共滑动窗口(Sliding Window )的所有元素都作为成对组合进行连接,并传递给JoinFunctionFlatJoinFunction。当前滑动窗口中没有来自另一个流的元素的流的元素不会被发出!请注意,有些元素可能会在一个滑动窗口中连接,但不会在另一个窗口中连接!

Flink Window Join机制

在本例中,我们使用的滑动窗口大小为2毫秒,滑动1毫秒,滑动窗口结果[1,0],[0,1],[1,2],[2、3],.... x轴以下是每个滑动窗口的Join结果将被传递给JoinFunction的元素。在这里你还可以看到橙②与绿色③窗口Join(2、3),但不与任何窗口Join[1,2]。

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply { (e1, e2) => e1 + "," + e2 }
    

会话窗口Join(Session Window Join)

在执行会话窗口连接时,具有相同键的所有元素(当“组合”时满足会话条件)都以成对的组合进行连接,并传递给JoinFunctionFlatJoinFunction。再次执行内部连接,因此如果会话窗口只包含来自一个流的元素,则不会发出任何输出!

Flink Window Join机制

在这里,定义一个会话窗口连接,其中每个会话被至少1ms的间隔所分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三次会话中绿色流没有元素,所以⑧⑨不会Join。

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply { (e1, e2) => e1 + "," + e2 }

间隔Join(Interval Join)

interval join用一个公共Key连接两个流的元素(将它们称为A & B),其中流B的元素的时间戳具有相对于流A中的元素的时间戳。 这也可以更正式地表示为b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b是A和B*享一个公钥的元素。下界和上界都可以是负的或正的,只要下界小于或等于上界。interval连接目前只执行内部连接。

当将一对元素传递给ProcessJoinFunction时,它们将给两个元素分配更大的时间戳(可以通过ProcessJoinFunction.Context访问)。
注意:间隔连接目前只支持事件时间。

Flink Window Join机制

在上面的示例中,我们将“橙色”和“绿色”两个流连接起来,它们的下界为-2毫秒,上界为+1毫秒。默认情况下,这些是包含边界的,但是可以通过.lowerboundexclusive(). upperboundexclusive()进行设置。

再用更正式的符号来表示angeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 如三角形所示。

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
         out.collect(left + "," + right); 
        }
      });
    });