序
本文主要研究一下flink的Global Window
GlobalWindow
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@PublicEvolving
public class GlobalWindow extends Window {
private static final GlobalWindow INSTANCE = new GlobalWindow();
private GlobalWindow() { }
public static GlobalWindow get() {
return INSTANCE;
}
@Override
public long maxTimestamp() {
return Long.MAX_VALUE;
}
@Override
public boolean equals(Object o) {
return this == o || !(o == null || getClass() != o.getClass());
}
@Override
public int hashCode() {
return 0;
}
@Override
public String toString() {
return "GlobalWindow";
}
/**
* A {@link TypeSerializer} for {@link GlobalWindow}.
*/
public static class Serializer extends TypeSerializerSingleton<GlobalWindow> {
private static final long serialVersionUID = 1L;
@Override
public boolean isImmutableType() {
return true;
}
@Override
public GlobalWindow createInstance() {
return GlobalWindow.INSTANCE;
}
@Override
public GlobalWindow copy(GlobalWindow from) {
return from;
}
@Override
public GlobalWindow copy(GlobalWindow from, GlobalWindow reuse) {
return from;
}
@Override
public int getLength() {
return 0;
}
@Override
public void serialize(GlobalWindow record, DataOutputView target) throws IOException {
target.writeByte(0);
}
@Override
public GlobalWindow deserialize(DataInputView source) throws IOException {
source.readByte();
return GlobalWindow.INSTANCE;
}
@Override
public GlobalWindow deserialize(GlobalWindow reuse,
DataInputView source) throws IOException {
source.readByte();
return GlobalWindow.INSTANCE;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
source.readByte();
target.writeByte(0);
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof Serializer;
}
}
}
复制代码
- GlobalWindow继承了Window,它的maxTimestamp方法与TimeWindow不同,TimeWindow有start和end属性,其maxTimestamp方法返回的是end-1;而GlobalWindow的maxTimestamp方法返回的是Long.MAX_VALUE;GlobalWindow定义了自己的Serializer
GlobalWindows
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;
private GlobalWindows() {}
@Override
public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(GlobalWindow.get());
}
@Override
public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return new NeverTrigger();
}
@Override
public String toString() {
return "GlobalWindows()";
}
/**
* Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns
* all elements to the same {@link GlobalWindow}.
*
* @return The global window policy.
*/
public static GlobalWindows create() {
return new GlobalWindows();
}
/**
* A trigger that never fires, as default Trigger for GlobalWindows.
*/
@Internal
public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;
@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {
}
}
@Override
public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new GlobalWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
}
复制代码
- GlobalWindows继承了WindowAssigner,key类型为Object,窗口类型为GlobalWindow
- assignWindows方法返回的是GlobalWindow;getDefaultTrigger方法返回的是NeverTrigger;getWindowSerializer返回的是GlobalWindow.Serializer();isEventTime返回的为false
- NeverTrigger继承了Trigger,其onElement、onProcessingTime、onProcessingTime返回的TriggerResult均为TriggerResult.CONTINUE
小结
- GlobalWindows继承了WindowAssigner,key类型为Object,窗口类型为GlobalWindow;GlobalWindow继承了Window,它的maxTimestamp方法与TimeWindow不同,TimeWindow有start和end属性,其maxTimestamp方法返回的是end-1;而GlobalWindow的maxTimestamp方法返回的是Long.MAX_VALUE;GlobalWindow定义了自己的Serializer
- GlobalWindows的assignWindows方法返回的是GlobalWindow;getDefaultTrigger方法返回的是NeverTrigger;getWindowSerializer返回的是GlobalWindow.Serializer();isEventTime返回的为false
- NeverTrigger继承了Trigger,其onElement、onProcessingTime、onProcessingTime返回的TriggerResult均为TriggerResult.CONTINUE;该行为就是不做任何触发操作;如果需要触发操作,则需要在定义window操作时设置自定义的trigger,覆盖GlobalWindows默认的NeverTrigger