欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink on Yarn(HA配置)

程序员文章站 2022-06-17 09:22:03
...

根据部署方式不同,Flink Jobmanager HA配置分为2种:

1、standalone cluster HA
2、Yarn cluster HA 
  • 1
  • 2
  • 1
  • 2

其中,standalone cluster HA可参考我之前的一篇文章

简单回顾下,standalone模式的HA需要多个“活着的”Jobmanager,其中1个作为leader,其他作为standby,leader选举依赖于Zookeeper。可以用下面的一张图来形象的表述standalone HA: 
Flink on Yarn(HA配置)

本文专门讨论Yarn下Flink HA的搭建与配置。

一、Flink On Yarn 简介

Flink部署在Yarn上,仅作为yarn上“多租户”的一个service而存在。Flink在yarn中容器的概念分为2种:

用于启动JobManager(AM)的容器
用于启动TaskManager的容器
  • 1
  • 2
  • 1
  • 2

我们可以通过yarn-session.sh –help来看下启动Flink On Yarn的参数信息: 
Flink on Yarn(HA配置)

其中-n代表taskmanager的容器数量,而不是taskmanager+jobmanager的容器数量。

在配置HA前,先通过-q看一下我的yarn集群的资源情况:

Flink on Yarn(HA配置)

从图中可以看出,我配置的每个NodeManager的内存是8192MB(yarn-site.xml),每个NodeManager的vcores数量是8。所以,当前yarn集群中可用内存总量为32768,总cores是32.

二、Flink on Yarn HA 配置

1、配置准备 
在配置Flink On Yarn之前,必须保证hdfs和yarn都已经开启,可以通过$HADOOP_HOME/sbin/start-all.sh启动hdfs和yarn。

2、配置AM在尝试重启的最大次数(yarn-site.xml)

此配置需要在$HADOOP_CONF_DIR 的yarn-site.xml添加。 
添加如下配置: 
Flink on Yarn(HA配置)

此配置代表application master在重启时,尝试的最大次数。

3、配置Application Attempts(flink-conf.yaml)

此参数需要在$FLINK_HOME/conf 的flink-conf.yaml中配置。 
添加如下配置: 
Flink on Yarn(HA配置)

此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。

注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps.
  • 1
  • 1

4、配置zookeeper信息 
虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper中,所以我们还要配置zookeeper的HA信息:

Flink on Yarn(HA配置)

其中,recovery.zookeeper.path.namespace也可以在启动Flink on Yarn时通过-z参数覆盖。 
在yarn模式下,jobmanager.rpc.address不需要指定,因为哪一个容器作为jobManager由Yarn决定,而不由Flink配置决定;taskmanager.tmp.dirs也不需要指定,这个参数将被yarn的tmp参数指定,默认就是/tmp目录下,保存一些用于上传到ResourceManager的jar或lib文件。parrallelism.default也不需要指定,因为在启动yarn时,通过-s指定每个taskmanager的slots数量。

完整的Flink配置信息如下:

#==============================================================================
#  Common
#==============================================================================
env.java.home: /home/flink/java/jdk1.8.0_60
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 6192
taskmanager.heap.mb: 8192
taskmanager.numberOfTaskSlots: 8
taskmanager.memory.preallocate: false

#==============================================================================
# Web Frontend
#==============================================================================
jobmanager.web.port: 8081

#==============================================================================
# Streaming state checkpointing
#==============================================================================
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints

#==============================================================================
# Advanced
#==============================================================================
taskmanager.network.numberOfBuffers: 64000
fs.hdfs.hadoopconf: /home/flink/hadoop/hadoop-2.6.0/etc/hadoop

#==============================================================================
# Master High Availability (required configuration)
#==============================================================================
recovery.mode: zookeeper
recovery.zookeeper.quorum: flink:2181,data0:2181,mf:2181
recovery.zookeeper.storageDir: hdfs:///flink/recovery
recovery.zookeeper.path.root: /flinkOnYarn
recovery.zookeeper.path.namespace: /cluster_yarn


#==============================================================================
# Yarn
#==============================================================================
yarn.application-attempts: 4


#==============================================================================
# Yarn will overwrite following parameters 
# 1. jobmanager.rpc.address
# 2. taskmanager.tmp.dirs
# 3. parallelism.default
#==============================================================================
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

三、启动Flink Yarn Session

启动Flink Yarn Session有2种模式:

分离模式
客户端模式
  • 1
  • 2
  • 1
  • 2

通过-d指定分离模式,即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通过yarn application -kill <Application_ID>命令来停止。

我们这里采用分离模式来启动Flink Yarn Session:

yarn-session.sh -n 3 -jm 4096 -tm 8192 -s 8 -nm FlinkOnYarnSession -d -st
  • 1
  • 1

我们可以通过yarn的webUI查看一下当前启动的Application: 
Flink on Yarn(HA配置)

可以看到名字是FlinkOnYarnSession,总内存32GB,运行使用的内存28GB(-jm指定了4GB),当前容器数量为4.我们通过ApplicationMaster tracking一下Flink的WebUI:

Flink on Yarn(HA配置)

四、提交Job

通过CLI方式提交:

flink run -c wikiedits.Test1 toptrade-flink-1.0.jar
  • 1
  • 1

我们看下目前Job的JobGraph: 
Flink on Yarn(HA配置)

五、HA测试

现在,我们kill掉Jobmanager(AM)进程YarnApplicationMasterRunner,看看Yarn Cluster的HA情况。 
Flink on Yarn(HA配置)

我们看到Application Attemp的ID增加了1: 
Flink on Yarn(HA配置)

我们再到mf42的$YARN_CONF_DIR(如果没设置则在$HADOOP_CONF_DIR)下看看日志情况,当前AM的日志路径在$HADOOP_CONF_DIR/userlogs/<Application_ID>/下,可以看出Yarn在重启YarnApplicationMasterRunner进程,并在重启期后重新提交Flink的Job。

再次查看进程: 
Flink on Yarn(HA配置)

YarnApplicationMasterRunner进程号变了。

此时,Flink的WebUI又可以访问了,而且Job被cancel掉后重新启动了。

六、未来Flink1.2中的Flink On Yarn

增强了以下几点:

1、不用先启动Yarn Session再提交Job,而是直接提交Job到Yarn集群,因此client可以断开连接
2、用户代码库和配置文件直接在classpath下,而不是在动态类加载器中
3、容器在需要时分配,不需要时释放资源
4、按需分配的容器可以针对不同的operator分配不同的CPU和Core资源,通过配置文件实现
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

Flink1.2中ResourceManager提出了一个Dispatcher的概念,主要用于统一发布Job并监控实例的运行。但时可以选择是否使用Dispatcher。

Flink on Yarn(HA配置)

Flink on Yarn(HA配置)

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/yarn_setup.html 
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/jobmanager_high_availability.html#yarn-cluster-high-availability 
http://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/#ibm-pcon 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 
https://www.youtube.com/watch?v=L21N8mNtvME 
http://www.jianshu.com/p/8a3177095072