欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

聊聊flink的Global Window

程序员文章站 2024-03-19 22:32:10
...

本文主要研究一下flink的Global Window

GlobalWindow

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java

@PublicEvolving
public class GlobalWindow extends Window {

	private static final GlobalWindow INSTANCE = new GlobalWindow();

	private GlobalWindow() { }

	public static GlobalWindow get() {
		return INSTANCE;
	}

	@Override
	public long maxTimestamp() {
		return Long.MAX_VALUE;
	}

	@Override
	public boolean equals(Object o) {
		return this == o || !(o == null || getClass() != o.getClass());
	}

	@Override
	public int hashCode() {
		return 0;
	}

	@Override
	public String toString() {
		return "GlobalWindow";
	}

	/**
	 * A {@link TypeSerializer} for {@link GlobalWindow}.
	 */
	public static class Serializer extends TypeSerializerSingleton<GlobalWindow> {
		private static final long serialVersionUID = 1L;

		@Override
		public boolean isImmutableType() {
			return true;
		}

		@Override
		public GlobalWindow createInstance() {
			return GlobalWindow.INSTANCE;
		}

		@Override
		public GlobalWindow copy(GlobalWindow from) {
			return from;
		}

		@Override
		public GlobalWindow copy(GlobalWindow from, GlobalWindow reuse) {
			return from;
		}

		@Override
		public int getLength() {
			return 0;
		}

		@Override
		public void serialize(GlobalWindow record, DataOutputView target) throws IOException {
			target.writeByte(0);
		}

		@Override
		public GlobalWindow deserialize(DataInputView source) throws IOException {
			source.readByte();
			return GlobalWindow.INSTANCE;
		}

		@Override
		public GlobalWindow deserialize(GlobalWindow reuse,
				DataInputView source) throws IOException {
			source.readByte();
			return GlobalWindow.INSTANCE;
		}

		@Override
		public void copy(DataInputView source, DataOutputView target) throws IOException {
			source.readByte();
			target.writeByte(0);
		}

		@Override
		public boolean canEqual(Object obj) {
			return obj instanceof Serializer;
		}
	}
}
复制代码
  • GlobalWindow继承了Window,它的maxTimestamp方法与TimeWindow不同,TimeWindow有start和end属性,其maxTimestamp方法返回的是end-1;而GlobalWindow的maxTimestamp方法返回的是Long.MAX_VALUE;GlobalWindow定义了自己的Serializer

GlobalWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java

@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
	private static final long serialVersionUID = 1L;

	private GlobalWindows() {}

	@Override
	public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		return Collections.singletonList(GlobalWindow.get());
	}

	@Override
	public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
		return new NeverTrigger();
	}

	@Override
	public String toString() {
		return "GlobalWindows()";
	}

	/**
	 * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns
	 * all elements to the same {@link GlobalWindow}.
	 *
	 * @return The global window policy.
	 */
	public static GlobalWindows create() {
		return new GlobalWindows();
	}

	/**
	 * A trigger that never fires, as default Trigger for GlobalWindows.
	 */
	@Internal
	public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
		private static final long serialVersionUID = 1L;

		@Override
		public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

		@Override
		public void onMerge(GlobalWindow window, OnMergeContext ctx) {
		}
	}

	@Override
	public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
		return new GlobalWindow.Serializer();
	}

	@Override
	public boolean isEventTime() {
		return false;
	}
}
复制代码
  • GlobalWindows继承了WindowAssigner,key类型为Object,窗口类型为GlobalWindow
  • assignWindows方法返回的是GlobalWindow;getDefaultTrigger方法返回的是NeverTrigger;getWindowSerializer返回的是GlobalWindow.Serializer();isEventTime返回的为false
  • NeverTrigger继承了Trigger,其onElement、onProcessingTime、onProcessingTime返回的TriggerResult均为TriggerResult.CONTINUE

小结

  • GlobalWindows继承了WindowAssigner,key类型为Object,窗口类型为GlobalWindow;GlobalWindow继承了Window,它的maxTimestamp方法与TimeWindow不同,TimeWindow有start和end属性,其maxTimestamp方法返回的是end-1;而GlobalWindow的maxTimestamp方法返回的是Long.MAX_VALUE;GlobalWindow定义了自己的Serializer
  • GlobalWindows的assignWindows方法返回的是GlobalWindow;getDefaultTrigger方法返回的是NeverTrigger;getWindowSerializer返回的是GlobalWindow.Serializer();isEventTime返回的为false
  • NeverTrigger继承了Trigger,其onElement、onProcessingTime、onProcessingTime返回的TriggerResult均为TriggerResult.CONTINUE;该行为就是不做任何触发操作;如果需要触发操作,则需要在定义window操作时设置自定义的trigger,覆盖GlobalWindows默认的NeverTrigger

doc