Flink任务提交
目录
测试代码如下:
package wordcount
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
/**
* Created by leboop on 2020/5/19.
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
val parameterTool = ParameterTool.fromArgs(args)
val host = parameterTool.get("host")
val port = parameterTool.get("port")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text: DataStream[String] = env.socketTextStream(host, port.toInt)
val counts = text.flatMap(_.toLowerCase().split("\\W+"))
.map((_, 1)).keyBy(0).sum(1)
counts.print()
env.execute("Streaming Count")
}
}
本案例的Flink测试环境是standalone cluster,没有集成Hadoop。
界面方式
1、打包上传
在IDEA中,使用Maven打包,然后打开Flink Dashboard,上传Jar包,并配置相关参数,如图:
对于本案例程序,在提交任务前需要开启监听端口,如图:
如果点击Submit出现如下错误信息:
可能有以下几个原因:
(1)jar包可能是瘦包,需要打成胖包;
(2)检查参数是否配置有问题,比如--port后面没有配置端口;
提交成功后如图:
任务会一直在RUNNING状态,当我们在nc端口下输入一些单词,任务失败了,报错如下:
java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object;
at wordcount.StreamWordCount$.$anonfun$main$1(StreamWordCount.scala:16)
at wordcount.StreamWordCount$.$anonfun$main$1$adapted(StreamWordCount.scala:16)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:675)
at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
原因是集群的Flink是scala2.11版本,而我们的程序scala2.12版本,将程序的scala版本修改为2.11即可。
2、查看运行结果
本集群有2台worker,如图:
最后在2台worker上都有输出,如图:
命令方式
1、将打好的jar包上传到master节点上,执行如下命令:
flink run -c wordcount.StreamWordCount /root/jars/myflink.jar --host bigdata111 --port 6666
提交成功后如图:
任务取消
查看当前运行的任务:
flink list
查看所有的任务:
flink list --all
如图:
任务取消:
点击flink dashborad页面的cancal,或者执行取消命令:
flink cancel jobid