Flink安装及WordCount实例yarn-cluster提交
flink
flink(基于数据流上的有状态计算)
flink的特点:
-
事件驱动型
事件驱动型应用是一类具有状态的应用,它从一个或多个事件六提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。(SparkStreaming是微批次,将批次更加微小化)
-
实时流处理
批处理的特点是有界、持久、大量。非常适合需要访问全套记录才能完成的计算工作,一般用于离线计算。
流处理的特点是*、实时。无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。
(在spark中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的)
flink中,一切都是流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是有界流和*流。
-
支持有状态计算
在流式计算过程中将算子的中间结果保存在内存或者文件系统中,等下一个事件进入算子后可以让当前事件的值与历史值进行汇总累计。
-
支持exactly-once语义
确保一条message、receiver只收到一次。
一、安装
1、下载flink对应包 https://www.apache.org/dyn/closer.lua/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.12.tgz
2、解压缩下载的包
3、修改flink/conf/flink-conf.yaml文件,修改远程连接地址
4、修改conf/slave文件,添加task运行节点(子节点)
5、将该文件分发到分布式服务器上
6、启动flink
bin/start-cluster.sh
7、验证启动成功,访问http://hadoop1:8081,可以在webUI界面查看对应节点信息。
一(1)、 yarn模式
flink的yarn模式,首先需要启动hadoop集群,可以使用start-yarn.sh start-yarn.sh
来启动。
hadop集群启动完成后,启动flink的yarn-session
bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
-n(--container):TaskManager的数量。
-s(--slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-tm:每个taskmanager的内存(单位MB)。
-nm:yarn 的appName(现在yarn的ui上的名字)。
-d:后台执行。
启动完成后,可以用Wordcount简单例子来实验。scala代码如下:(创建maven工程)
pom.xml 内容如下:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Wordcount的scala代码内容如下:
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import scala.concurrent.ExecutionContext
object WCAppBatch {
def main(args: Array[String]): Unit = {
val tool: ParameterTool = ParameterTool.fromArgs(args)
val input: String = tool.get("input")
val output: String = tool.get("output")
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val ds: DataSet[String] = env.readTextFile(input)
import org.apache.flink.api.scala._
val aggDS: AggregateDataSet[(String, Int)] = ds.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
aggDS.writeAsText(output)
env.execute()
}
}
编写代码完成后,利用maven打包,将jar包上传到搭建好的集群中。到flink 的bin目录执行程序。
./flink run -m yarn-cluster -c xxx.xxx.WCAppBatch xxxxx.jar --input xxxx.txt --output xxx.txt
在上述命令中,-m是集群模式,为yarn集群模式,-c是编写的WordCount的全类名,接着是jar包名字,input 和output是传入参数。出现下图表示运行成功,可以去WebUI查看具体日志。
注意:传入的input参数必须在计算节点上有(计算节点为配置文件slave中写入的主机),否则会出现does not exist or the user running Flink ('hadoop') has insufficient permissions to access it.
FileNotFoundException异常。