Flink on Yarn配置
Flink1.9.1与Yarn(hadoop2.6.2)结合使用,将flink的任务运行在Yarn上。
配置环境变量
- 配置HADOOP_CLASSPATH
export HADOOP_CLASSPATH=${HADOOP_HOME}/bin/hadoop classpath
- 配置 YARN_CONF_DIR or HADOOP_CONF_DIR
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
下载软件&目录权限
只需要在提交flink job的机器上部署即可。
下载flink-1.9.1-bin-scala_2.11.tgz、flink-shaded-hadoop-2-uber-2.6.5-7.0.jar(pre-bundled Hadoop)
并将flink-shaded-hadoop-2-uber-2.6.5-7.0.jar放置到${flink_home}lib目录中
权限全部使用hadoop用户
ln -s /app/flink-1.9.1 /opt/flink
chown -R hadoop:hadoop /app/flink-1.9.1
chown -h hadoop:hadoop /opt/flink
启动flink yarn cluster
/opt/flink/bin/yarn-session.sh -tm 8192 -s 10
可以通过-Dfs.overwrite-files=true方式,覆盖flink-conf.yaml中的配置项
重点参数
-d 参数当flink yarn客户端提交启动flink session cluster的命令后,退出。
cd /opt/flink;./bin/yarn-session.sh -d
-id参数, -id,–applicationId YARN application Id
可以启动一个交互命令窗口,与已经启动的yarn session cluster通信。然后通过键入stop回车或ctrl+c可以关闭原有的flink YARN cluster。
提交任务
拷贝一个测试文件到hdfs目录
hadoop fs -copyFromLocal README.txt /kylin
提交flink job方式一
/opt/flink/bin/flink run ./examples/batch/WordCount.jar \
–input hdfs:///kylin/README.txt --output hdfs:///kylin/wordcount-result.txt
因为是在当前机器启动的flink yarn cluster,所以会发现/tmp/.yarn-properties-hadoop文件。
进而发现job jobmanager的地址
运行时,打印的部分日志:
Found Yarn properties file under /tmp/.yarn-properties-hadoop
#Generated YARN properties file
#Fri Dec 27 16:44:21 CST 2019
parallelism=1
dynamicPropertiesString=
applicationID=application_1577352810017_3044
提交flink job方式二
指定 --jobmanager方式提交任务
-m,–jobmanager Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
在http://rm1:8088/cluster/app/application_1577352810017_3044
appattempt_1577352810017_3044_000001连接中,找到jobmanager的url,node1:59968。
执行命令:
/opt/flink/bin/flink run -m node1:59968 ./examples/batch/WordCount.jar \
--input hdfs:///kylin/README.txt --output hdfs:///kylin/wordcount-result333.txt
在Yarn上启动flink,只运行一个job
类似在yarn上运行spark任务。通过“-m yarn-cluster”指定要运行的yarn集群。“ yarn-cluster”是在yarn-site.xml里面配置的yarn.resourcemanager.cluster-id的值。
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar \
--input hdfs:///kylin/README.txt --output hdfs:///kylin/wordcount-result444.txt
参考:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/yarn_setup.html