初识Flink之Windows下Flink流处理(Streaming)案例开发
程序员文章站
2022-06-16 17:26:49
...
本次示例在Windows系统下,使用idea2019.3进行开发,jdk版本1.8,flink版本1.6.1.
- 新建一个maven项目,没有flink-quickstart-java模板,可以选择添加:Add Archetype 填写如下内容:
- 填写项目对应的信息。
- 创建StreamingJob内容如下:
-
public static void main(String[] args) throws Exception { //获取需要的端口号分析: // 通过Socket模拟产生单词,使用Flink程序对数据进行汇总计算。 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); } catch (Exception e) { System.err.println("No port set. use default port 9000--Java"); port = 9000; } //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //主机名不能填写错误,这里使用localhost或者通过ipconfig/all 查到的主机名 String hostname = "localhost"; String delimiter = "\n"; //连接Socket获取输入的数据 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); // a a c // a 1 // a 1 // c 1 DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") //指定时间窗口大小为2s,指定时间间隔为2s .timeWindow(Time.seconds(2), Time.seconds(2)) //在这里使用sum或者reduce都可以 .sum("count"); // .reduce(new ReduceFunction<WordWithCount>() { // @Override // public WordWithCount reduce(WordWithCount a, WordWithCount b) { // return new WordWithCount(a.word, a.count + b.count); // } // }); //把数据打印到控制台并且设置并行度 windowCounts.print().setParallelism(1); //这一行代码一定要实现,否则程序不执行 env.execute("Socket window count"); } public static class WordWithCount { public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } }
- 在Windows系统下需要下载一个nc,打开cmd窗口执行nc -l -p 9000
- nc下载地址:https://pan.baidu.com/s/1YvG49wL8JmMMCEO_c6-uww
- 提取码:l03b
- 解压后nc.exe 放入c盘users 对应的用户目录下,打开cmd即可执行:
- 在idea 执行程序:结果如下:
- 项目地址:https://gitee.com/linghongkang90/flink/tree/master/flinkDemo
下一篇: 第1节、一个萝卜一个坑——计数排序