欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink | 入门实践WordCount

程序员文章站 2022-06-16 15:21:05
...
<!--主要依赖-->
<flink.version>1.6.1</flink.version>
   
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-scala_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
	

 

Scala基于Flink 批处理实现WordCount,如下:

object SocketWordCount {
    def main(args: Array[String]): Unit = {
        import org.apache.flink.api.scala._
        // 获取执行环境
        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
        // 加载/创建初始化数据源
        val source = env.fromElements("Hello Flink Flink Drag Flink SQL Streaming Spark Streaming Storm JStorm")
        // 指定操作是数据的转换算子
        val counts = source.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
        // 调用execute()触发执行程序, 这里执行print打印到客户端
        counts.print()
        // counts.writeAsCsv("E:\\TestCode\\Flink\\data","\\n"," ")
        // env.execute("Flink Batch Word Count By Scala")

    }
}

测试结果省。。。。。。。

Flink 批处理操作中,需要注意的是当DataSet调用print()时,源码内部已经调用Excute方法,所以此处不再调用,如果调用会出现以下错误
 

java.lang.RuntimeException: No new data sinks have been defined since the last execution. 

The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.

具体源码如下:

// 首先调用 org.apache.flink.api.scala.DataSet 的print()
def print(): Unit = {
	javaSet.print()
}

// 这里需要注意的是 DataSet =>  org.apache.flink.api.java.{DataSet => JavaDataSet}

// 再看下org.apache.flink.api.java.DataSet 的print()

public void print() throws Exception {
	List<T> elements = collect();
	for (T e: elements) {
		System.out.println(e);
	}
}

public List<T> collect() throws Exception {
	final String id = new AbstractID().toString();
	final TypeSerializer<T> serializer = getType().createSerializer(getExecutionEnvironment().getConfig());

	this.output(new Utils.CollectHelper<>(id, serializer)).name("collect()");
	JobExecutionResult res = getExecutionEnvironment().execute();

	ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
	if (accResult != null) {
		......
	}
}

另外当不执行print(),而调用execute(),Flink要求需要Sink。


Scala基于Flink 流处理实现WordCount,如下:

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.log4j.{Level, Logger}

case class WordCount(word : String, count : Int)
object SocketWordCount {
    val appName: String = this.getClass.getSimpleName.replace("$", "")
    val logger: Logger = Logger.getLogger(appName)
    Logger.getLogger("org.apache").setLevel(Level.ERROR)
    def main(args: Array[String]): Unit = {
        // 获取Socket Port
        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {logger.error("No Port Set.")}
                9000
        }
        // 获取执行环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
        // 加载/创建初始化数据源 这里通过链接Socket获取输入数据
        val text = env.socketTextStream("localhost", port, '\n')
        // 指定操作是数据的转换算子
        val wordCounts = text.flatMap(line => line.split("\\s")).map(w => WordCount(w, 1)).keyBy("word")
            .timeWindow(Time.seconds(2), Time.seconds(2)).sum("count")
        // 调用execute()触发执行程序, 这里执行print打印到客户端
        wordCounts.print().setParallelism(1)
        env.execute("Socket Window Word Count")
    }
}

 

本地测试的Flink DataStreaming,通过Socke获取输入数据源,需要注意:

1、windows执行nc命令,需要下载netcat。下载地址:https://eternallybored.org/misc/netcat/

2. 在执行程序前,先在CMD黑窗口执行nc -l -p 9000,监听端口9000 ,不然老梗错了:

 org.apache.flink.runtime.client.JobExecutionException: 
        java.net.ConnectException: Connection refused: connect

3.检测端口是否允许,看下测试结果:

Flink | 入门实践WordCount

 


 

小结

开发Flink程序有固定的流程?

1、 获取执行环境

2、加载/创建初始化数据源

3、指定操作是数据的转换算子

4、指定计算好的数据存放位置(Sink)

5、调用execute()触发执行程序

注意: Flink程序也是延迟计算的, 只有最后调用execute()时才会真正触发执行程序

 

另外在批处理中,如果输出目的端,执行的 print 命令(除此之外,还有count,collect方法),则执行任务Execute不需要调用。(批处理计算Word时有简单测试)


如果批处理代码中,输出目的端调用writeAsCsv、writeAsText等其他方法,则后面需要调用Execute;(批处理中有简单测试)

批处理获取执行环境用ExecutionEnvironment,流处理获取环境用StreamExecutionEnvironment

批处理后的数据是DataSet,流处理后的数据是DataStream.

 

相关标签: Flink flink