Flink on Yarn(HA配置)
根据部署方式不同,Flink Jobmanager HA配置分为2种:
1、standalone cluster HA
2、Yarn cluster HA
- 1
- 2
- 1
- 2
其中,standalone cluster HA可参考我之前的一篇文章。
简单回顾下,standalone模式的HA需要多个“活着的”Jobmanager,其中1个作为leader,其他作为standby,leader选举依赖于Zookeeper。可以用下面的一张图来形象的表述standalone HA:
本文专门讨论Yarn下Flink HA的搭建与配置。
一、Flink On Yarn 简介
Flink部署在Yarn上,仅作为yarn上“多租户”的一个service而存在。Flink在yarn中容器的概念分为2种:
用于启动JobManager(AM)的容器
用于启动TaskManager的容器
- 1
- 2
- 1
- 2
我们可以通过yarn-session.sh –help来看下启动Flink On Yarn的参数信息:
其中-n代表taskmanager的容器数量,而不是taskmanager+jobmanager的容器数量。
在配置HA前,先通过-q看一下我的yarn集群的资源情况:
从图中可以看出,我配置的每个NodeManager的内存是8192MB(yarn-site.xml),每个NodeManager的vcores数量是8。所以,当前yarn集群中可用内存总量为32768,总cores是32.
二、Flink on Yarn HA 配置
1、配置准备
在配置Flink On Yarn之前,必须保证hdfs和yarn都已经开启,可以通过$HADOOP_HOME/sbin/start-all.sh启动hdfs和yarn。
2、配置AM在尝试重启的最大次数(yarn-site.xml)
此配置需要在$HADOOP_CONF_DIR
的yarn-site.xml添加。
添加如下配置:
此配置代表application master在重启时,尝试的最大次数。
3、配置Application Attempts(flink-conf.yaml)
此参数需要在$FLINK_HOME/conf
的flink-conf.yaml中配置。
添加如下配置:
此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps.
- 1
- 1
4、配置zookeeper信息
虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper中,所以我们还要配置zookeeper的HA信息:
其中,recovery.zookeeper.path.namespace也可以在启动Flink on Yarn时通过-z参数覆盖。
在yarn模式下,jobmanager.rpc.address不需要指定,因为哪一个容器作为jobManager由Yarn决定,而不由Flink配置决定;taskmanager.tmp.dirs也不需要指定,这个参数将被yarn的tmp参数指定,默认就是/tmp目录下,保存一些用于上传到ResourceManager的jar或lib文件。parrallelism.default也不需要指定,因为在启动yarn时,通过-s指定每个taskmanager的slots数量。
完整的Flink配置信息如下:
#==============================================================================
# Common
#==============================================================================
env.java.home: /home/flink/java/jdk1.8.0_60
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 6192
taskmanager.heap.mb: 8192
taskmanager.numberOfTaskSlots: 8
taskmanager.memory.preallocate: false
#==============================================================================
# Web Frontend
#==============================================================================
jobmanager.web.port: 8081
#==============================================================================
# Streaming state checkpointing
#==============================================================================
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
#==============================================================================
# Advanced
#==============================================================================
taskmanager.network.numberOfBuffers: 64000
fs.hdfs.hadoopconf: /home/flink/hadoop/hadoop-2.6.0/etc/hadoop
#==============================================================================
# Master High Availability (required configuration)
#==============================================================================
recovery.mode: zookeeper
recovery.zookeeper.quorum: flink:2181,data0:2181,mf:2181
recovery.zookeeper.storageDir: hdfs:///flink/recovery
recovery.zookeeper.path.root: /flinkOnYarn
recovery.zookeeper.path.namespace: /cluster_yarn
#==============================================================================
# Yarn
#==============================================================================
yarn.application-attempts: 4
#==============================================================================
# Yarn will overwrite following parameters
# 1. jobmanager.rpc.address
# 2. taskmanager.tmp.dirs
# 3. parallelism.default
#==============================================================================
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
三、启动Flink Yarn Session
启动Flink Yarn Session有2种模式:
分离模式
客户端模式
- 1
- 2
- 1
- 2
通过-d指定分离模式,即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通过yarn application -kill <Application_ID>
命令来停止。
我们这里采用分离模式来启动Flink Yarn Session:
yarn-session.sh -n 3 -jm 4096 -tm 8192 -s 8 -nm FlinkOnYarnSession -d -st
- 1
- 1
我们可以通过yarn的webUI查看一下当前启动的Application:
可以看到名字是FlinkOnYarnSession,总内存32GB,运行使用的内存28GB(-jm指定了4GB),当前容器数量为4.我们通过ApplicationMaster tracking一下Flink的WebUI:
四、提交Job
通过CLI方式提交:
flink run -c wikiedits.Test1 toptrade-flink-1.0.jar
- 1
- 1
我们看下目前Job的JobGraph:
五、HA测试
现在,我们kill掉Jobmanager(AM)进程YarnApplicationMasterRunner,看看Yarn Cluster的HA情况。
我们看到Application Attemp的ID增加了1:
我们再到mf42的$YARN_CONF_DIR
(如果没设置则在$HADOOP_CONF_DIR)下看看日志情况,当前AM的日志路径在$HADOOP_CONF_DIR/userlogs/<Application_ID>/
下,可以看出Yarn在重启YarnApplicationMasterRunner进程,并在重启期后重新提交Flink的Job。
再次查看进程:
YarnApplicationMasterRunner进程号变了。
此时,Flink的WebUI又可以访问了,而且Job被cancel掉后重新启动了。
六、未来Flink1.2中的Flink On Yarn
增强了以下几点:
1、不用先启动Yarn Session再提交Job,而是直接提交Job到Yarn集群,因此client可以断开连接
2、用户代码库和配置文件直接在classpath下,而不是在动态类加载器中
3、容器在需要时分配,不需要时释放资源
4、按需分配的容器可以针对不同的operator分配不同的CPU和Core资源,通过配置文件实现
- 1
- 2
- 3
- 4
- 1
- 2
- 3
- 4
Flink1.2中ResourceManager提出了一个Dispatcher的概念,主要用于统一发布Job并监控实例的运行。但时可以选择是否使用Dispatcher。
参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/jobmanager_high_availability.html#yarn-cluster-high-availability
http://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/#ibm-pcon
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
https://www.youtube.com/watch?v=L21N8mNtvME
http://www.jianshu.com/p/8a3177095072