Flink | 入门实践WordCount
<!--主要依赖-->
<flink.version>1.6.1</flink.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
Scala基于Flink 批处理实现WordCount,如下:
object SocketWordCount {
def main(args: Array[String]): Unit = {
import org.apache.flink.api.scala._
// 获取执行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 加载/创建初始化数据源
val source = env.fromElements("Hello Flink Flink Drag Flink SQL Streaming Spark Streaming Storm JStorm")
// 指定操作是数据的转换算子
val counts = source.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
// 调用execute()触发执行程序, 这里执行print打印到客户端
counts.print()
// counts.writeAsCsv("E:\\TestCode\\Flink\\data","\\n"," ")
// env.execute("Flink Batch Word Count By Scala")
}
}
测试结果省。。。。。。。
Flink 批处理操作中,需要注意的是当DataSet调用print()时,源码内部已经调用Excute方法,所以此处不再调用,如果调用会出现以下错误
java.lang.RuntimeException: No new data sinks have been defined since the last execution.
The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
具体源码如下:
// 首先调用 org.apache.flink.api.scala.DataSet 的print()
def print(): Unit = {
javaSet.print()
}
// 这里需要注意的是 DataSet => org.apache.flink.api.java.{DataSet => JavaDataSet}
// 再看下org.apache.flink.api.java.DataSet 的print()
public void print() throws Exception {
List<T> elements = collect();
for (T e: elements) {
System.out.println(e);
}
}
public List<T> collect() throws Exception {
final String id = new AbstractID().toString();
final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());
this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
JobExecutionResult res = getExecutionEnvironment().execute();
ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
if (accResult != null) {
......
}
}
另外当不执行print(),而调用execute(),Flink要求需要Sink。
Scala基于Flink 流处理实现WordCount,如下:
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.log4j.{Level, Logger}
case class WordCount(word : String, count : Int)
object SocketWordCount {
val appName: String = this.getClass.getSimpleName.replace("$", "")
val logger: Logger = Logger.getLogger(appName)
Logger.getLogger("org.apache").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
// 获取Socket Port
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {logger.error("No Port Set.")}
9000
}
// 获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
// 加载/创建初始化数据源 这里通过链接Socket获取输入数据
val text = env.socketTextStream("localhost", port, '\n')
// 指定操作是数据的转换算子
val wordCounts = text.flatMap(line => line.split("\\s")).map(w => WordCount(w, 1)).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(2)).sum("count")
// 调用execute()触发执行程序, 这里执行print打印到客户端
wordCounts.print().setParallelism(1)
env.execute("Socket Window Word Count")
}
}
本地测试的Flink DataStreaming,通过Socke获取输入数据源,需要注意:
1、windows执行nc命令,需要下载netcat。下载地址:https://eternallybored.org/misc/netcat/
2. 在执行程序前,先在CMD黑窗口执行nc -l -p 9000,监听端口9000 ,不然老梗错了:
org.apache.flink.runtime.client.JobExecutionException:
java.net.ConnectException: Connection refused: connect
3.检测端口是否允许,看下测试结果:
小结:
开发Flink程序有固定的流程?
1、 获取执行环境
2、加载/创建初始化数据源
3、指定操作是数据的转换算子
4、指定计算好的数据存放位置(Sink)
5、调用execute()触发执行程序
注意: Flink程序也是延迟计算的, 只有最后调用execute()时才会真正触发执行程序
另外在批处理中,如果输出目的端,执行的 print 命令(除此之外,还有count,collect方法),则执行任务Execute不需要调用。(批处理计算Word时有简单测试)
如果批处理代码中,输出目的端调用writeAsCsv、writeAsText等其他方法,则后面需要调用Execute;(批处理中有简单测试)
批处理获取执行环境用ExecutionEnvironment,流处理获取环境用StreamExecutionEnvironment
批处理后的数据是DataSet,流处理后的数据是DataStream.
上一篇: XML (1)Java解析XML示例
下一篇: IDEA实现JDBC的操作步骤