Flink On YARN使用
Flink on yarn的job运行模式大致分为两类:
参考链接:
https://blog.csdn.net/a_drjiaoda/article/details/88203323
1、内存集中管理模式 yarn session:在Yarn中初始化一个Flink集群,开辟指定的资源,之后我们提交的Flink Jon都在这个Flink yarn-session中,也就是说不管提交多少个job,这些job都会共用开始时在yarn中申请的资源。这个Flink集群会常驻在Yarn集群中,除非手动停止。
2、内存Job管理模式 yarn single job【推荐使用】:在Yarn中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。
一、内存集中管理模式
支持两种提交方式,默认不指定就是客户端方式,如果需要使用集群方式提交的话。可以在提交作业的命令行中指定-d进行进群模式提交。如果是在分离式模式你会发现,在所resourcemanager会出现一个 YarnSessionClusterEntrypoint进程;如果是客户端模式,运行yarn-session的主机上会运行FlinkYarnSessionCli和YarnSessionClusterEntrypoint两个进程.
分为两步:yarn-session.sh(开辟资源)+flink run(提交任务)
1、开辟资源,使用命令
yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
参数介绍:
-n taskmanager的数量
-s 每个taskmanager的slot数量,默认一个slot对应一个core
-jm jobmanager的内存大小(要求至少2G以上)
-tm 每个 taskmanager的内存大小
-qu yarn的队列名字
-nm yarn应用名称
-st:以流模式启动Flink;
一旦session创建成功,你可以使用./bin/flink工具向集群提交任务。
启动后可以通过 主机:8081查看flink的web界面
其实,由于这还是属于一个Yarn application,因此我们也可以通过yarn.resourcemanager.webapp.address来查看,例如我这里刚刚启动了两个Flink集群,这里可通过Tracking UI的值来跳转到对用的Flink集群监控页面。
关闭某个Flink集群我们可以直接使用 yarn application -kill application_1552292557465_0001 来结束进程。
2、提交任务
为了进行测试,我们对Flink目录下的LICENSE文件进行词频统计
上传文件至HDFS。hadoop fs -put LICENSE /
查看文件是否上传成功。hadoop fs -ls /
执行命令。./flink run ../examples/batch/WordCount.jar -input hdfs://192.168.83.129:9000/LICENSE -output hdfs://192.168.83.129:9000/wordcount-result.txt
查看输出结果。hadoop fs -cat /wordcount-result.txt
这次提交flink job,虽然没有指定对应yarn application的信息,确可以提交到对应的flink集群,原因是flink自动保存了上一次创建yarn session的集群信息。所以如果同一用户在同一机器上再次创建一个yarn session,则这个文件会被覆盖掉。Yarn session会在/tmp 下⽣成⼀个⽂件
cat /tmp/.yarn-properties-admin
parallelism=12
dynamicPropertiesString=
applicationID=application_1532332183347_0708
如果在另一个机器上提交作业能否提交到预期到yarn session中呢?这也是可以的,通过“-yid”参数传入:
通过 -yid 参数来提交到指定的session。
$./bin/flink run -d -p 30 -m yarn-cluster -yid application_1532332183347_0708 ./examp
les/streaming/TopSpeedWindowing.jar
也可以用-m参数指定,yarn-session启动后,系统自动分配的ApplicationMaster主机和节点和端口
如下图中yarn-session启动成功后,会提示一个主机和端口后,这个就是JobManager(也是ApplicationMaster)
使用-m参数可以在任意集群主机提交JOB。
bin/flink run -m vmhome10.com:43258 examples/batch/WordCount.jar
二、内存Job管理模式
第二种模式其实也分为两个部分,依然是开辟资源和提交任务,但是在Job模式下,这两步都合成一个命令了。
./flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 …/examples/batch/WordCount.jar。上面的命令中没有指定-input 和 -output,这是由于有默认的数据集和输出方式,看看效果。
下面yarn application的图可以清晰的反映第二种方式,在job结束后就会关闭flink yarn-session的集群。
参数解释:
• "run" 操作参数:
// -c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
// -p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。
上一篇: Java 序列化
下一篇: 第七次作业解析——日志文件程序
推荐阅读
-
省点花锦鲤卡app怎么激活 省点花锦鲤卡激活后怎么使用
-
Android自定义View 使用PathMeasure简单模仿系统ProgressBar(四)
-
solidworks零件模型怎么使用剖面命令?
-
Android studio怎么使用git获取最新内容然后合并?
-
省点花锦鲤卡可以在美团上使用吗 省点花锦鲤卡app怎么在美团上用
-
Wing FTP Server FTP服务器端中文版安装使用教程
-
eclipse格式化代码快捷键无法使用怎么办?
-
android使用DataBinding来设置空状态
-
IDPhotoStudio证件照打印使用教程
-
百中搜优化软件怎么样?百中搜优化软件使用教程(附视频教程)