欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

FlinkkeyBy分流与Window操作解析

程序员文章站 2024-01-26 13:44:46
FlinkkeyBy分流与Window操作解析。 import org.apache.flink.api.java.functions.KeySelector; import org....

FlinkkeyBy分流与Window操作解析。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.CachingTokenFilter;
import redis.clients.jedis.JedisCluster;
public class MySelfSourceTest01 {
    static Logger logger = Logger.getLogger(MySelfSourceTest01.class);
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource> redisDS = env.addSource(new SourceFunction>() {
            @Override
            public void run(SourceContext> ctx) throws Exception {
                Random random = new Random();
                while (true) {
                    ctx.collect(new Tuple2( "1", "A"));
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {

            }
        });
        DataStreamSource> redisDSB = env.addSource(new SourceFunction>() {
            @Override
            public void run(SourceContext> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2("2", "B"));
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {

            }
        });
        DataStreamSource> redisDSC = env.addSource(new SourceFunction>() {
            @Override
            public void run(SourceContext> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2("3", "C"));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        DataStream> union = redisDS.union(redisDSB).union(redisDSC);
        KeyedStream, Integer> tuple2IntegerKeyedStream = union.keyBy(new KeySelector, Integer>() {
            @Override
            public Integer getKey(Tuple2 value) throws Exception {
                return Integer.parseInt(value.f0) % 1;
            }
        });
        tuple2IntegerKeyedStream.timeWindow(Time.seconds(2)).apply(new WindowFunction, String, Integer, TimeWindow>() {
            @Override
            public void apply(Integer integer, TimeWindow window, Iterable> input, Collector out) throws Exception {
                StringBuffer stringBuffer = new StringBuffer();
                input.forEach(t -> {
                    stringBuffer.append(t.toString()).append("  ");
                });
                System.out.println(stringBuffer.toString());
                System.out.println();
            }
        });


//                .timeWindow(Time.seconds(2)).apply(new WindowFunction, Tuple2, Tuple, TimeWindow>() {
//            @Override
//            public void apply(Tuple tuple, TimeWindow window, Iterable> input, Collector> out) throws Exception {
//                StringBuffer stringBuffer = new StringBuffer();
//                input.forEach(t -> {
//                    stringBuffer.append(t.toString()).append("  ");
//                });
//                System.out.println(stringBuffer.toString());
//                System.out.println();
//            }
//        });
//        union.timeWindowAll(Time.seconds(2)).apply(new AllWindowFunction, String, TimeWindow>() {
//            @Override
//            public void apply(TimeWindow window, Iterable> values, Collector out) throws Exception {
//                StringBuffer stringBuffer = new StringBuffer();
//                values.forEach(t -> {
//                    stringBuffer.append(t.toString()).append("  ");
//                });
//                System.out.println(stringBuffer.toString());
//                System.out.println();
//            }
//        });


        try {
            env.execute("a");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

keyBy分流是按选择的key进行分流的,如果直接使用timeWindowAll只是一个分流,并行度就是1,如果keyBy使用自定义的key分流,如例,key等于1 2 3 在keyBy中可以进行取模分流,这样有可能会产生分流倾斜。