Flink客户端操作
参考学习阿里巴巴周凯波Flink客户端操作内容。
概要
Flink 提供了丰富的客户端操作来提交任务和与任务进行交互,包括 Flink 命令行,Scala Shell,SQL Client,Restful API 和 Web。Flink 首先提供的最重要的是命令行,其次是 SQL Client 用于提交 SQL 任务的运行,还有就是 Scala Shell 提交 Table API 的任务。同时,Flink 也提供了Restful 服务,用户可以通过http 方式进行调用。此外,还有 Web 的方式可以提交任务。
在 flink 安装目录的 bin 目录下面可以看到有 flink, start-scala-shell.sh 和 sql-client.sh 等文件,这些都是客户端操作的入口。
Flink 命令行
flink -h
输入 flink – h 能看到完整的说明:
flink run -h
查看具体某一个命令的参数。这里是看run命令的具体参数。
启动 standalone 集群
bin/start-cluster.sh
打开 http://hostname:8081 能看到 Web 界面
run
bin/flink run -d examples/streaming/TopSpeedWindowing.jar
默认是1个并发
可以在web界面的task Manager下面的stdout查看输出内容。
也可以通过本地日志查看: *.out日志文件
list
查看任务列表
bin/flink list
可以在后面加上flink的端口号
stop
./bin/flink stop -m 192.168.56.120:8081 ec5e5e25084cb16488ed4938d8e456ef
一个 Job 能够被 Stop 要求所有的 Source 都是可以 Stoppable 的,即实现了StoppableFunction 接口。
/**
* 需要能 stoppable 的函数必须实现这个接口,例如流式任务的 source。
* stop() 方法在任务收到 STOP 信号的时候调用。
* source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。
*/
@PublicEvolving
public interface StoppableFunction {
/**
* 停止 source。与 cancel() 不同的是,这是一个让 source 优雅停止的请求。
* 等待中的数据可以继续发送出去,不需要立即停止。
*/
void stop();
}
Cancel
取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,会保存Savepoint,否则不会保存 Savepoint
./bin/flink cancel -m 192.168.56.120:8081 ec5e5e25084cb16488ed4938d8e456ef
也可以在停止的时候显示指定 savepoint 目录
bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint ec5e5e25084cb16488ed4938d8e456ef
取消和停止(流作业)的区别如下:
cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。
● stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 source 实现了StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 source 都将接收 stop() 方法调用。直到所有 source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。
savepoint
触发savepoint
bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint
savepoint 和 checkpoint 的区别
●checkpoint 是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自 动触发,用户无须感知;savepoint 是全量做的,每次的时间较长,数据量较大,需要 用户主动去触发。
● checkpoint 是作业 failover 的时候自动使用,不需要用户指定。savepoint 一般用于程 序的版本更新(详见文档),bug修复,A/B Test等场景,需要用户指定。
从指定的 savepoint 启动
bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./e xamples/streaming/TopSpeedWindowing.jar
modify
修改任务并行度。
通过 modify 命令将并发度修改为 4和3,每次 modify 命令都会触发一次 savepoint
bin/flink modify -p 4 790d7b98db6f6af55d04aec1d773852d
bin/flink modify -p 3 790d7b98db6f6af55d04aec1d773852d
info
bin/flink info examples/streaming/TopSpeedWindowing.jar
拷贝输出的 json 内容,粘贴到这个网站:http://flink.apache.org/visualizer/
yarn per-job
默认是attach模式,即客户端会一直等待直到程序结束才会退出。
● 通过 -m yarn-cluster 指定 yarn 模式
● 客户端能看到结果输出
● Yarn上显示 Flink session cluster,这个batch任务运行完会 FINISHED。
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
如果我们以 attach 模式运行 streaming 的任务,客户端会一直等待不退出
单任务detached
● 由于是 detached 模式,客户端提交完任务就退出了
● Yarn 上显示为 Flink per-job cluster
yarn session
启动 session
./bin/yarn-session.sh -tm 2048 -s 3
表示启动一个 yarn session 集群,每个TM的内存是2G,每个TM有3个slot。( -n 参数不生效)
客户端默认是attach模式,不会退出
– 可以 ctrl + c 退出,然后再通过 ./bin/yarn-session.sh -id application_1532332183 347_0708 连上来。
– 或者启动的时候用 -d 则为detached模式
其中:
-n(–container):TaskManager的数量。
-s(–slots): 每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-tm:每个taskmanager的内存(单位MB)。
-nm:yarn 的appName(现在yarn的ui上的名字)。
-d:后台执行。
提交到指定的 session
通过 -yid 参数来提交到指定的session
./bin/flink run -d -p 30 -m yarn-cluster -yid application_1532332183347_0708 ./examp les/streaming/TopSpeedWindowing.jar
可以去yarn控制台查看任务状态
Scala Shell
官⽅方⽂文档:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/scala_shell.html
bin/start-scala-shell.sh local
任务运行说明:
● Batch 任务内置了 benv 变量,通过 print() 将结果输出到控制台
● Streaming 任务内置了 senv 变量,通过 senv.execute(“job name”) 来提交任务,且Datastrea
m的输出只有在 local 模式下打印到控制台。
remote
先启动一个 yarn session cluster
启动 scala shell,连到 jm
bin/start-scala-shell.sh remote z054.sqa.net 28665
yarn
./bin/start-scala-shell.sh yarn -n 2 -jm 1024 -s 2 -tm 1024 -nm flink-yarn
可以在scala客户端,执行Dataset和DataFrame操作,类似于spark
SQL Client Beta
Restful API
flink 的 restful api 文档:https://ci.apache.org/projects/flink/flin
k-docs-stable/monitoring/rest_api.html
Web
在 Flink Dashboard 页面左侧可以看到有个『Submit new Job』的地方,用户可以上传 jar 包 和显示执行计划和提交任务。Web 提交功能主要用于新手入门和演示用