FlinkkeyBy分流与Window操作解析
程序员文章站
2024-01-26 13:44:46
FlinkkeyBy分流与Window操作解析。
import org.apache.flink.api.java.functions.KeySelector;
import org....
FlinkkeyBy分流与Window操作解析。
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.lucene.analysis.CachingTokenFilter; import redis.clients.jedis.JedisCluster; public class MySelfSourceTest01 { static Logger logger = Logger.getLogger(MySelfSourceTest01.class); public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource> redisDS = env.addSource(new SourceFunction>() { @Override public void run(SourceContext> ctx) throws Exception { Random random = new Random(); while (true) { ctx.collect(new Tuple2( "1", "A")); Thread.sleep(1000); } } @Override public void cancel() { } }); DataStreamSource> redisDSB = env.addSource(new SourceFunction>() { @Override public void run(SourceContext> ctx) throws Exception { while (true) { ctx.collect(new Tuple2("2", "B")); Thread.sleep(1000); } } @Override public void cancel() { } }); DataStreamSource> redisDSC = env.addSource(new SourceFunction>() { @Override public void run(SourceContext> ctx) throws Exception { while (true) { ctx.collect(new Tuple2("3", "C")); Thread.sleep(1000); } } @Override public void cancel() { } }); DataStream> union = redisDS.union(redisDSB).union(redisDSC); KeyedStream, Integer> tuple2IntegerKeyedStream = union.keyBy(new KeySelector, Integer>() { @Override public Integer getKey(Tuple2 value) throws Exception { return Integer.parseInt(value.f0) % 1; } }); tuple2IntegerKeyedStream.timeWindow(Time.seconds(2)).apply(new WindowFunction, String, Integer, TimeWindow>() { @Override public void apply(Integer integer, TimeWindow window, Iterable> input, Collector out) throws Exception { StringBuffer stringBuffer = new StringBuffer(); input.forEach(t -> { stringBuffer.append(t.toString()).append(" "); }); System.out.println(stringBuffer.toString()); System.out.println(); } }); // .timeWindow(Time.seconds(2)).apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() { // @Override // public void apply(Tuple tuple, TimeWindow window, Iterable> input, Collector> out) throws Exception { // StringBuffer stringBuffer = new StringBuffer(); // input.forEach(t -> { // stringBuffer.append(t.toString()).append(" "); // }); // System.out.println(stringBuffer.toString()); // System.out.println(); // } // }); // union.timeWindowAll(Time.seconds(2)).apply(new AllWindowFunction, String, TimeWindow>() { // @Override // public void apply(TimeWindow window, Iterable> values, Collector out) throws Exception { // StringBuffer stringBuffer = new StringBuffer(); // values.forEach(t -> { // stringBuffer.append(t.toString()).append(" "); // }); // System.out.println(stringBuffer.toString()); // System.out.println(); // } // }); try { env.execute("a"); } catch (Exception e) { e.printStackTrace(); } } }
keyBy分流是按选择的key进行分流的,如果直接使用timeWindowAll只是一个分流,并行度就是1,如果keyBy使用自定义的key分流,如例,key等于1 2 3 在keyBy中可以进行取模分流,这样有可能会产生分流倾斜。
推荐阅读
-
基础会计习题与案例答案 php基础知识:类与对象4 范围解析操作符::
-
Linux下CVS的安装配置与操作命令全解析
-
Linux中的文件与目录操作利器mv命令使用解析
-
apache 二级域名解析 (window与linux)
-
全面解析jQuery中的$(window)与$(document)的用法区别
-
php基础知识:类与对象(4) 范围解析操作符(::)
-
【Android 异步操作】Handler 机制 ( Android 提供的 Handler 源码解析 | Handler 构造与消息分发 | MessageQueue 消息队列相关方法 )
-
总结JavaScript(Iframe、window.open、window.showModalDialog)父窗口与子窗口之间的操作
-
总结JavaScript(Iframe、window.open、window.showModalDialog)父窗口与子窗口之间的操作
-
Python图像处理之gif动态图的解析与合成操作详解