欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Flink整合oozie shell Action 提交任务 带kerberos认证

程序员文章站 2022-11-06 17:29:15
最近这段时间一直在忙新集群迁移,上了最新的cdh6.3.0 于是Flink 提交遇到了许多的问题 还好有cloudera License 有了原厂的帮助和社区的伙伴,问题解决起来快了不少,手动滑稽 集群具体情况是,cdh6.3.0+Flink1.8.1,整个数据平台全部组件都上了kerberos和l ......

最近这段时间一直在忙新集群迁移,上了最新的cdh6.3.0 于是flink 提交遇到了许多的问题

还好有cloudera license 有了原厂的帮助和社区的伙伴,问题解决起来快了不少,手动滑稽

集群具体情况是,cdh6.3.0+flink1.8.1,整个数据平台全部组件都上了kerberos和ldap因为要过认证,所以任务提交方法我们选择统一oozie提交任务

并且因为kerberos认证,还需要flink perjob 需要单独的keytab,才能细腻度的控制权限,因为我们现在部门之间计算资源的划分是通过yarn资源队列

但是现在flink支持的不是很好,目前只能在配置文件中配置一个keytab,job启动都去这个拉这个keytab复制到自己的contain里面

但是flink第一提交方式还是希望能够通过oozie提交job

由于oozie没有天生支持flink提交,所以只能选择oozie shell action 的方式提交job

在flink搭建好以后开始提交任务,用oozie shell提交

#!/bin/bash

flink run -m yarn-cluster flinktest.jar

马上  duang

flink command not find

改成命令绝对路径以后! 还是 duang

org.apache.flink.client.deployment.clusterdeploymentexception: couldn't deploy yarn session cluster

at org.apache.flink.yarn.abstractyarnclusterdescriptor.deploysessioncluster(abstractyarnclusterdescriptor.java:387)

at org.apache.flink.client.cli.clifrontend.runprogram(clifrontend.java:259) at org.apache.flink.client.cli.clifrontend.run(clifrontend.java:213)

at org.apache.flink.client.cli.clifrontend.parseparameters(clifrontend.java:1050)

at org.apache.flink.client.cli.clifrontend.lambda$main$11(clifrontend.java:1126)

at java.security.accesscontroller.doprivileged(native method) at javax.security.auth.subject.doas(subject.java:422)

at org.apache.hadoop.security.usergroupinformation.doas(usergroupinformation.java:1836)

at org.apache.flink.runtime.security.hadoopsecuritycontext.runsecured(hadoopsecuritycontext.java:41)

调度不了yarn ,这是因为oozie会覆盖掉hadoop_conf_dir

于是在shell里面手动export hadoop_conf_dir = xxxxx

发现!!!

可以提交了

但是!!!

有时候能成功有时候失败????黑人问号

org.apache.flink.runtime.resourcemanager.exceptions.resourcemanagerexception: could not start the resourcemanager akka.tcp://flink@xxxxx:36166/user/resourcemanager

at org.apache.flink.runtime.resourcemanager.resourcemanager.onstart(resourcemanager.java:202)

at org.apache.flink.runtime.rpc.akka.akkarpcactor$stoppedstate.start(akkarpcactor.java:539)

at org.apache.flink.runtime.rpc.akka.akkarpcactor.handlecontrolmessage(akkarpcactor.java:164)

at org.apache.flink.runtime.rpc.akka.akkarpcactor.onreceive(akkarpcactor.java:142)

at org.apache.flink.runtime.rpc.akka.fencedakkarpcactor.onreceive(fencedakkarpcactor.java:40)

at akka.actor.untypedactor$$anonfun$receive$1.applyorelse(untypedactor.scala:165)

at akka.actor.actor$class.aroundreceive(actor.scala:502)

at akka.actor.untypedactor.aroundreceive(untypedactor.scala:95)

at akka.actor.actorcell.receivemessage(actorcell.scala:526)

at akka.actor.actorcell.invoke(actorcell.scala:495)

at akka.dispatch.mailbox.processmailbox(mailbox.scala:257)

at akka.dispatch.mailbox.run(mailbox.scala:224)

at akka.dispatch.mailbox.exec(mailbox.scala:234)

at scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260)

at scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)

at scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)

at scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) caused by: org.apache.flink.runtime.resourcemanager.exceptions.resourcemanagerexception: could not start resource manager client.

at org.apache.flink.yarn.yarnresourcemanager.initialize(yarnresourcemanager.java:250)

at org.apache.flink.runtime.resourcemanager.resourcemanager.startresourcemanagerservices(resourcemanager.java:212)

at org.apache.flink.runtime.resourcemanager.resourcemanager.onstart(resourcemanager.java:200)

... 16 more caused by: org.apache.hadoop.yarn.exceptions.invalidapplicationmasterrequestexception: application master is already regist

resourcemanager注册 application master的时候已经被注册了?然后发生了一些异常

但是有时候又可以提交成功,这个就让我有点困惑

最后发现是因为oozie覆盖了很多集群上的环境变量导致

解决办法 在oozie 脚本的flink命令前加env -i

这样会清除所有的环境变量,oozie就会使用登陆yarn用户的环境变量来运行shell了

终于

#!/bin/bash

env -i /flink run -m yarn-cluster flinktest.jar

shell action成功提交flink任务

但是kerberos现在还没有解决,因为这样提交job会去服务器上读flink-conf.yaml文件里的kerberos认证,然后复制对应的keytab到所有容器,所有任务都是公用的一个

这样的话不能实现每个job单独使用一个keytab,每个job使用自己对应的kerberos认证

于是在社区群上取了下经,大家实现的方法也是千奇百怪

有全部任务公用一个认证的,有用cicd在容器每次提交的镜像中在flink-conf.yaml中修改为指定的kerberos的

但是 我们不一样~~

因为我们是oozie提交任务,有点头大,还好最后还是解决了

因为flink是通过去flink_conf_dir路径下去读取默认的flink-conf.yaml文件中的kerberos认证

那我们就需要在oozie shell 脚本中指定我们自己修改的flink-conf.yaml文件路径通过手动指定flink_conf_dir去覆盖flink默认的

这个路径我们填写相对路径,因为oozie运行时会将提交的文件复制到运行时的相对路径下面

也就是说,我们可以oozie中把我们的keytab文件以及整个conf文件夹都上传上去,修改conf/flink-conf.yaml文件中的kerberos选项

security.kerberos.login.keytab = . 

security.kerberos.login.principal = xxx

这里的keytab路径就填写相对路径./因为oozie会把你上传的keytab拷贝过去

最后运行oozie shell 脚本

#!/bin/bash

env -i flink_conf_dir=./conf   /flink run -m yarn-cluster  ./flinktest.jar

成功使用自己指定的keytab用户运行job