Flink的WordCount实现(Java和Scala)
程序员文章站
2022-06-17 10:23:08
...
Java实现WordCount
package com.flink.Java;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* Created by Shi shuai RollerQing on 2019/12/16 15:44
* 用窗口操作的方式进行WordCount *
* 需要实现每隔1秒对最近2秒内的数据进行聚合操作
*/
public class WordCount {
public static void main(String[] args) throws Exception {
// 获取服务数据的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
} catch (Exception e) {
System.err.println("No port set . Please use default port 9000");
port = 6666;
}
String hostname = "hadoop01";
// 初始化对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取数据
DataStreamSource<String> data = env.socketTextStream(hostname, port);
// 开始计算
// 生成一个个元组: (word, 1)
SingleOutputStreamOperator<WordWithCount> pairWords = data.flatMap(
new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String s, Collector<WordWithCount> out) throws Exception {
String[] splits = s.split(" ");
for (String word : splits) {
out.collect(new WordWithCount(word, 1L));
}
}
}
);
// 将元组按照key进行分组
KeyedStream<WordWithCount, Tuple> grouped = pairWords.keyBy("word");
//调用窗口操作
//需要给两个重要的参数 窗口长度和滑动间隔
WindowedStream<WordWithCount, Tuple, TimeWindow> window = grouped.timeWindow(Time.seconds(2), Time.seconds(1));
SingleOutputStreamOperator<WordWithCount> counts = window.sum("count");
// window.reduce(new ReduceFunction<WordWithCount>() {
// @Override
// public WordWithCount reduce(WordWithCount value1, WordWithCount value2) throws Exception {
// return new WordWithCount(value1.word, value1.count + value2.count);
// }
// });
//打印
counts.print().setParallelism(1);
env.execute("WordCount");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
启动hadoop01,输入命令 nc -lk 6666
idea运行时并没有输入参数 所以根据catch处理port被设置为6666
scala
流式WordCount代码实现
package com.flink.demo01
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Created by Shi shuai RollerQing on 2019/12/16 19:03
* 流式WordCount-Scala代码实现
*
* 用窗口操作的方式进行WordCount
*/
object WordCount_Scala {
def main(args: Array[String]): Unit = {
// 获取NetCat的port
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port set, Use default port 6666")
}
6666
}
// 获取上下文对象(初始化对象)
val env = StreamExecutionEnvironment.getExecutionEnvironment
//获取数据
val data = env.socketTextStream("hadoop01", port)
// 必须要引入这个包,这里面有很多在计算时用到的一些方法
import org.apache.flink.api.scala._
// 进行解析数据,并按照需求进行计算
val words = data.flatMap(_.split("\\s+")) // 获取数据并进行切分,生成一个个单词
val tups = words.map(w => WordWithCount(w, 1)) // 将一个个单词生成一个个对偶元组
val grouped = tups.keyBy("word") // 分组
// val grouped = tups.keyBy(0)// 分组
val window = grouped.timeWindow(Time.seconds(2), Time.seconds(2)) // 调用窗口操作
// val res = window.sum("count") // 聚合
val res = window.reduce((a, b) => WordWithCount(a.word, a.count + b.count))
// 将结果打印
res.print.setParallelism(1)
// 开始执行
env.execute("scala wordCount")
}
case class WordWithCount(word: String, count: Int)
}
批式WordCount代码实现
package com.flink.demo01
import org.apache.flink.api.java.operators.DataSink
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
/**
* Created by Shi shuai RollerQing on 2019/12/16 19:29
*/
object WordCountB_Scala {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
//get input data
val text: DataSet[String] = env.readTextFile("C:\\Users\\HP\\IdeaProjects\\sparkCore\\data\\test.txt")
import org.apache.flink.api.scala._
val counts = text.flatMap(_.toLowerCase.split(" ").filter(_.nonEmpty))
.map((_, 1))
.groupBy(0)
.sum(1)
counts.collect().foreach(println)
// counts.setParallelism(1).writeAsCsv("C:\\Users\\HP\\IdeaProjects\\sparkCore\\data\\csvwc", "\n", "\t")
// val write: DataSink[(String, Int)] = counts.setParallelism(1).writeAsCsv("C:\\Users\\HP\\IdeaProjects\\sparkCore\\data\\csvwc", "\n", "\t")
}
}
为什么运行counts.setParallelism(1).writeAsCsv(“C:\Users\HP\IdeaProjects\sparkCore\data\csvwc”, “\n”, “\t”)这句话没有反应 就是没有对应文件输出???
依赖
<properties>
<scala.version>2.11.8</scala.version>
<flink.version>1.7.2</flink.version>
</properties>
<!-- java依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- scala依赖 -->
<!--flink 有界数据处理依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink *数据处理依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
上一篇: http请求报文与响应报文