欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink的高可用集群环境

程序员文章站 2022-03-14 19:05:14
...

Flink的高可用集群环境


Flink简介

       Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布,数据通信以及容错机制等功能。

         因现在主要Flink这一块做先关方面的学习,因此准备要开通Apache Flink专栏这一块定期发布一些文章。今天在自己的博客因为专栏无法申请通过,所以先在此记录第一篇关于Flink部署的文章。

         在这里顺便打个小广告,Flink社区第一季线下meetup,已在上海,北京举办。接下来分别会在成都和深圳举办接下来的几期,也希望小伙伴们踊跃的加入到Flink社区来,下载钉钉,扫描下方二维码即可加入大群。
Flink的高可用集群环境

     首先今天先介绍一下Flink的安装,安装部署最新1.6版本支持有8种安装方式,详细可以参考安装部署方式【Clusters & Deployment】 。下面主要介绍Standalone Cluster模式和on yarn模式 。

软件包下载地址

一.Flink独立集群模式安装(Cluster Standalone)

1.1.解压安装

   [root@h001 soft]# tar -zxvf flink-1.2.0-bin-hadoop26-scala_2.11.tgz -C /usr/bigdata/

1.2.Flink配置(Configuring Flink)

对其进行相关的配置。主要涉及到的配置文件是conf/flink-conf.yaml

flink-conf.yaml配置

jobmanager.rpc.address:值设置成你master节点的IP地址
taskmanager.heap.mb:每个TaskManager可用的总内存
taskmanager.numberOfTaskSlots:每台机器上可用CPU的总数
parallelism.default:每个Job运行时默认的并行度(这个参数在文档中介绍好像有问题)
taskmanager.tmp.dirs:临时目录
jobmanager.heap.mb:每个节点的JVM能够分配的最大内存
jobmanager.rpc.port: 6123
jobmanager.web.port: 8081

[aaa@qq.com conf]# vim flink-conf.yaml
    jobmanager.rpc.address:h001
    taskmanager.heap.mb2048
    taskmanager.numberOfTaskSlots4
    parallelism.default10
    taskmanager.tmp.dirs:/tmp
    jobmanager.heap.mb2048
    jobmanager.web.port: 8081
    jobmanager.rpc.port: 6123

主节点与从节点配置

 [root@h002 conf]# vim slaves
     h002
     h003
     h004
     h005
[root@h001 conf]# vim masters
    h001:8082

1.3.Flink安装包分发到所有的worker节点上

[aaa@qq.com bigdata]# clush -v -w h[002-005] --copy flink-1.5.1 --dest /usr/bigdata/

1.4.启动Flink(Starting Flink)

       在master节点上运行下面的脚本,那么这台机器上将会启动一个JobManager,并通过SSH连接列在slaves文件中的所有节点以便在每个节点上启动TaskManager

[root@h001 flink-1.5.1]# bin/start-cluster.sh

如果停止集群,可以在master节点上运行下面的命令

[root@h001 flink-1.5.1]# bin/stop-cluster.sh

1.5. 在已经运行的集群中添加JobManager/TaskManager

      通过bin/taskmanager.sh或者bin/jobmanager.sh脚本在已经运行的集群中添加JobManager或者TaskManager节点

[aaa@qq.com flink-1.2.0]# bin/jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)

[aaa@qq.com flink-1.2.0]# bin/taskmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)

二.JobManager高可用(HA)

       JobManager协调每一个Flink集群环境,它负责作业调度和资源管理。默认情况下,一个Flink集群中只有一个JobManager实例,这很容易造成单点故障(SPOF)。如果JobManager奔溃了,那么将没有新的程序被提交,同时运行的程序将失败。
       对于JobManager高可用来说,我们可以从失败的JobManager中恢复,因此可以消除单点故障的问题。我们可以配置Standalone模式和YARN集群模式下的高可用JobManager的HA,是通过Zookeeper实现的,因此需要先搭建好Zookeeper集群,同时HA的信息,还要存储在HDFS中,因此也需要Hadoop集群,最后修改Flink中的配置文件。

根据部署方式不同,Flink Jobmanager HA配置分为2种:
        1、standalone cluster HA
        2、Yarn cluster HA

2.1. Standalone集群模式高可用

        对于Standalone集群模式下的JobManager高可用通常的方案是:Flink集群的任一时刻只有一个leading JobManager,并且有多个standby JobManager。当leader失败后,standby通过选举出一个JobManager作为新的leader。这个方案可以保证没有单点故障的问题。对于standby和master JobManager实例来说,其实没有明确的区别,每一个JobManager能够当担master或standby角色。

2.1.1.相关配置

      为了保证JobManager高可用,你需要设置Zookeeper为recovery mode(恢复模式),配置一个Zookeeper quorum并且对所有的JobManager节点和它们的Web UI端口号设置一个masters文件。

  • Flink引入Zookeeper的目的主要是让JobManager实现高可用(leader选举)
  • Flink使用Zookeeper在所有运行的JobManager实例中进行分布式调度的协调。Zookeeper在Flink中是一个独立的服务,它能够通过leader选举和轻量级的一致性状态存储来提供高度可靠的分布式协调器
    Master File(masters)
  • 为了启动一个HA-cluster,需要在conf/masters中配置masters。
  • masters文件:masters文件包含所有的hosts,每个host启动都JobManager,并且指定绑定的Web UI端口号:
jobManagerAddress1:webUIPort1
[...]
jobManagerAddressX:webUIPortX

配置文件flink-conf.yaml

为了启动一个HA-Cluster,需要在conf/flink-conf.yaml添加如下配置参数:

Recovery mode(必须的):recovery.mode: zookeeper
zookeeper quorum(必须的):recovery.zookeeper.quorum: address1:2181,...
Zookeeper root(推荐的):Flink在Zookeeper中的root节点,下面放置所有需要协调的数据recovery.zookeeper.path.root: /flink

      如果你运行多个Flink HA集群,那么你必须手工配置每个Flink集群使用独立的root节点

      State backend and storage    directory(必须的):JobManager元数据在statebackend保持并且仅仅在Zookeeper中存储,目前在HA模式中,仅支持filesystem。
      state.backend: filesystem
      state.backend.fs.checkpointdir:hdfs://namenode-host:port/flink-checkpoints
      recovery.zookeeper.storageDir: hdfs:///recovery
recovery.zookeeper.storageDir指定的路径中存储了所有的元数据,用来恢复失败的JobManager

2.2. 两个JobManager的Standalone模式下的集群

conf/flink-conf.yaml文件

配置恢复模式和Zookeeper quorum

[aaa@qq.com conf]# vim flink-conf.yaml
   recovery.mode: zookeeper
   recovery.zookeeper.quorum: h002:2181,h003:2181,h004:2181
   recovery.zookeeper.path.root: /flink 
   state.backend: filesystem
   state.backend.fs.checkpointdir: hdfs://h001:9000/flink/checkpoints
   recovery.zookeeper.storageDir: hdfs://h001:9000/flink/recovery

在Hadoop文件系统创建文件夹

[root@h001 conf]# hadoop fs -mkdir -p /flink/checkpoints
[root@h001 conf]# hadoop fs -mkdir -p /flink/recovery
[root@h001 conf]# hadoop fs -chown -R hdfs:supergroup /flink/

配置conf/masters文件

   [aaa@qq.com conf]# vim masters
       h001:8081
       h002:8081

配置conf/zoo.cfg文件,添加Zookeeper集群节点

[aaa@qq.com conf]# vim zoo.cfg
    server.1=h002:2888:3888
    server.2=h003:2888:3888
    server.3=h004:2888:3888

启动Zookeeper集群

[root@h001 flink-1.2.0]# bin/start-zookeeper-quorum.sh

启动Flink集群

   [root@h001 flink-1.2.0]#  bin/start-cluster.sh

经过测试kill掉其中一个jobmanager可切换主备。

2.2. YARN集群模式高可用

        当运行一个高可用YARN集群时,我们不需要运行多个JobManager(ApplicationMaster)实例,只需要运行一个实例,如果失败了通过YARN来进行重启
        Flink部署在Yarn上,仅作为yarn上“多租户”的一个service而存在。Flink在yarn中容器的概念分为2种:

      用于启动JobManager(AM)的容器
      用于启动TaskManager的容器

通过yarn-session.sh –help来看下启动Flink On Yarn的参数信息
Flink的高可用集群环境
        其中-n代表taskmanager的容器数量,而不是taskmanager+jobmanager的容器数量
在配置HA前,先通过-q看一下我的yarn集群的资源情况:
Flink的高可用集群环境
      从图中可以看出,我配置的每个NodeManager的内存是2048MB(yarn-site.xml),每个NodeManager的vcores数量是2。所以,当前yarn集群中可用内存总量为6144,总cores是6

2.2.1. FLINK ON YARN HA 配置

配置准备

   在配置Flink On Yarn之前,必须保证hdfs和yarn都已经开启,可以通过HADOOPHOME/sbin/startall.shhdfsyarnyarnsite.xmlHADOOP_CONF_DIR 的yarn-site.xml添加

[aaa@qq.com ~]# cd /usr/bigdata/hadoop/etc/hadoop/
[aaa@qq.com hadoop]# vim yarn-site.xml
   <property>
      <name>yarn.resourcemanager.am.max-attempts</name>
      <value>4</value>
</property>

此配置代表application master在重启时,尝试的最大次数

[aaa@qq.com hadoop]# clush -v -w h[002-005] --copy yarn-site.xml --dest /usr/bigdata/hadoop/etc/hadoop/

配置(flink-conf.yaml),此参数需要在$FLINK_HOME/conf 的flink-conf.yaml中配置

 [root@h001 conf]# vim flink-conf.yaml
   yarn.application-attempts: 10

      此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
      注意,Flink On Yarn环境中,当Jobmanager(ApplicationMaster)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps

[aaa@qq.com conf]#  clush -v -w h[002-005] --copy flink-conf.yaml  --dest /usr/bigdata/flink-1.5.1/conf/

配置zookeeper信息
      虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper中,所以我们还要配置zookeeper的HA信息。其中,recovery.zookeeper.path.namespace也可以在启动Flink on Yarn时通过-z参数覆盖。
      在yarn模式下,jobmanager.rpc.address不需要指定,因为哪一个容器作为jobManager由Yarn决定,而不由Flink配置决定;taskmanager.tmp.dirs也不需要指定,这个参数将被yarn的tmp参数指定,默认就是/tmp目录下,保存一些用于上传到ResourceManager的jar或lib文件。parrallelism.default也不需要指定,因为在启动yarn时,通过-s指定每个taskmanager的slots数量。

完整的Flink配置信息如下:

aaa@qq.com conf]# vim flink-conf.yaml
       env.java.home: /usr/java/jdk1.8.0_111
       recovery.mode: zookeeper
       recovery.zookeeper.quorum: h002:2181,h003:2181,h004:2181
       recovery.zookeeper.path.root: /flink 
       recovery.zookeeper.path.namespace: /cluster_yarn
       state.backend: filesystem
       state.backend.fs.checkpointdir: hdfs://h001:9000/flink/checkpoints
       recovery.zookeeper.storageDir: hdfs://h001:9000/flink/recovery
       taskmanager.network.numberOfBuffers: 64000
       fs.hdfs.hadoopconf: /usr/bigdata/hadoop/etc/Hadoop

以上的yarn HA配置可在Standalone集群模式下进一步添加几个参数即可完成。

2.2.2.启动FLINK YARN SESSION

在YARN上启动一个Flink主要有两种方式:

(1)、启动一个YARN session(Start a long-running Flink cluster on YARN)
(2)、直接在YARN上提交运行Flink作业(Run a Flink job on YARN)

Flink YARN Session

启动Flink Yarn Session有2种模式:

 (1)、分离模式
 (2)、客户端模式

      通过-d指定分离模式,即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通过yarn application -kill 命令来停止
      这种模式下会启动yarn session,并且会启动Flink的两个必要服务:JobManager和TaskManagers,然后你可以向集群提交作业。同一个Session中可以提交多个Flink作业。需要注意的是,这种模式下Hadoop的版本至少是2.2,而且必须安装了HDFS(因为启动YARN session的时候会向HDFS上提交相关的jar文件和配置文件)。我们可以通过./bin/yarn-session.sh脚本启动YARN Session。
      在启动的是可以指定TaskManager的个数以及内存(默认是1G),也可以指定JobManager的内存,但是JobManager的个数只能是一个。
采用客户端模式来启动Flink Yarn Session:

  [aaa@qq.com flink-1.5.1]# bin/yarn-session.sh -n 4 -jm 4096 -tm 8192 -s 2 -nm FlinkOnYarnSession -d -st
  或者
 ./bin/yarn-session.sh -n 4 -tm 8192 -s 8

参数说明:

-n:--container 指YARN container分配的个数(即TaskManagers的个数)
-jm:--jobManagerMemory 指JobManager Containe的内存大小,单位为MB
-tm:--taskManagerMemory 指每个TaskManagerContainer的内存大小,单位为MB
-s :指每个TaskManager的slot个数

可以通过yarn的webUI查看一下当前启动的Application
Flink的高可用集群环境
通过ApplicationMaster tracking一下Flink的WebUI
http://192.168.xxx.xxx:8088/proxy/application_1500340359200_0002/#/overview
提交作业
使用bin/flink脚本提交作业,同样我们来看看这个脚本支持哪些参数:

  [aaa@qq.com flink-1.5.1]#  bin/flink run
    bin/flink run ./examples/batch/WordCount.jar     \
          --input hdfs:///user/test/LICENSE     \
          --output hdfs:///user/test/result.txt

后面相应的跟上参数提交作业即可。

Run a single Flink job on YARN

      上面的YARN session是在Hadoop YARN环境下启动一个Flink cluster集群,里面的资源是可以共享给其他的Flink作业。我们还可以在YARN上启动一个Flink作业。这里我们还是使用./bin/flink,但是不需要事先启动YARN session:

[aaa@qq.com flink-1.5.1]# bin/flink run -m yarn-cluster -yn 2    ./examples/batch/WordCount.jar      \
          --input hdfs:///user/test/LICENSE    \                          
          --output hdfs:///user/test/result.txt

      上面的命令同样会启动一个类似于YARN session启动的页面。其中的-yn是指TaskManager的个数,必须指定。

相关标签: Flink