Flink源码分析 - 剖析一个简单的Flink程序
本篇文章首发于头条号flink程序是如何执行的?通过源码来剖析一个简单的flink程序,欢迎关注和微信公众号“大数据技术和人工智能”(微信搜索bigdata_ai_tech)获取更多干货,也欢迎关注我的csdn博客。
在这之前已经介绍了如何在本地搭建flink环境和如何创建flink应用和如何构建flink源码,这篇文章用官方提供的socketwindowwordcount例子来解析一下一个常规flink程序的每一个基本步骤。
示例程序
public class socketwindowwordcount { public static void main(string[] args) throws exception { // the host and the port to connect to final string hostname; final int port; try { final parametertool params = parametertool.fromargs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getint("port"); } catch (exception e) { system.err.println("no port specified. please run 'socketwindowwordcount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); system.err.println("to start a simple text server, run 'netcat -l <port>' and " + "type the input text into the command line"); return; } // get the execution environment final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); // get input data by connecting to the socket datastream<string> text = env.sockettextstream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts datastream<wordwithcount> windowcounts = text .flatmap(new flatmapfunction<string, wordwithcount>() { @override public void flatmap(string value, collector<wordwithcount> out) { for (string word : value.split("\\s")) { out.collect(new wordwithcount(word, 1l)); } } }) .keyby("word") .timewindow(time.seconds(5)) .reduce(new reducefunction<wordwithcount>() { @override public wordwithcount reduce(wordwithcount a, wordwithcount b) { return new wordwithcount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowcounts.print().setparallelism(1); env.execute("socket window wordcount"); } // ------------------------------------------------------------------------ /** * data type for words with count. */ public static class wordwithcount { public string word; public long count; public wordwithcount() {} public wordwithcount(string word, long count) { this.word = word; this.count = count; } @override public string tostring() { return word + " : " + count; } } }
上面这个是官网的socketwindowwordcount
程序示例,它首先从命令行中获取socket连接的host和port,然后获取执行环境、从socket连接中读取数据、解析和转换数据,最后输出结果数据。
每个flink程序都包含以下几个相同的基本部分:
- 获得一个execution environment,
- 加载/创建初始数据,
- 指定此数据的转换,
- 指定放置计算结果的位置,
- 触发程序执行
flink执行环境
final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
flink程序都是从这句代码开始,这行代码会返回一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回一个由createlocalenvironment()
创建的本地执行环境localstreamenvironment
。从其源码里可以看出来:
//代码目录:org/apache/flink/streaming/api/environment/streamexecutionenvironment.java public static streamexecutionenvironment getexecutionenvironment() { if (contextenvironmentfactory != null) { return contextenvironmentfactory.createexecutionenvironment(); } executionenvironment env = executionenvironment.getexecutionenvironment(); if (env instanceof contextenvironment) { return new streamcontextenvironment((contextenvironment) env); } else if (env instanceof optimizerplanenvironment || env instanceof previewplanenvironment) { return new streamplanenvironment(env); } else { return createlocalenvironment(); } }
获取输入数据
datastream<string> text = env.sockettextstream(hostname, port, "\n");
这个例子里的源数据来自于socket,这里会根据指定的socket配置创建socket连接,然后创建一个新数据流,包含从套接字无限接收的字符串,接收的字符串由系统的默认字符集解码。当socket连接关闭时,数据读取会立即终止。通过查看源码可以发现,这里实际上是通过指定的socket配置来构造一个sockettextstreamfunction
实例,然后源源不断的从socket连接里读取输入的数据创建数据流。
//代码目录:org/apache/flink/streaming/api/environment/streamexecutionenvironment.java @publicevolving public datastreamsource<string> sockettextstream(string hostname, int port, string delimiter, long maxretry) { return addsource(new sockettextstreamfunction(hostname, port, delimiter, maxretry), "socket stream"); }
sockettextstreamfunction
的类继承关系如下:
可以看出sockettextstreamfunction
是sourcefunction
的子类,sourcefunction
是flink中所有流数据源的基本接口。sourcefunction
的定义如下:
//代码目录:org/apache/flink/streaming/api/functions/source/sourcefunction.java @public public interface sourcefunction<t> extends function, serializable { void run(sourcecontext<t> ctx) throws exception; void cancel(); @public interface sourcecontext<t> { void collect(t element); @publicevolving void collectwithtimestamp(t element, long timestamp); @publicevolving void emitwatermark(watermark mark); @publicevolving void markastemporarilyidle(); object getcheckpointlock(); void close(); } }
sourcefunction
定义了run
和cancel
两个方法和sourcecontext
内部接口。
- run(sourcecontex):实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发。
- cancel():用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止。
- sourcecontext:source函数用于发出元素和可能的watermark的接口,返回source生成的元素的类型。
了解了sourcefunction
这个接口,再来看下sockettextstreamfunction
的具体实现(主要是run
方法),逻辑就已经很清晰了,就是从指定的hostname和port持续不断的读取数据,按回车换行分隔符划分成一个个字符串,然后再将数据转发到下游。现在回到streamexecutionenvironment
的sockettextstream
方法,它通过调用addsource
返回一个datastreamsource
实例。思考一下,例子里的text
变量是datastream
类型,为什么源码里的返回类型却是datastreamsource
呢?这是因为datastream
是datastreamsource
的父类,下面的类关系图可以看出来,这也体现出了java的多态的特性。
数据流操作
对上面取到的datastreamsource,进行flatmap
、keyby
、timewindow
、reduce
转换操作。
datastream<wordwithcount> windowcounts = text .flatmap(new flatmapfunction<string, wordwithcount>() { @override public void flatmap(string value, collector<wordwithcount> out) { for (string word : value.split("\\s")) { out.collect(new wordwithcount(word, 1l)); } } }) .keyby("word") .timewindow(time.seconds(5)) .reduce(new reducefunction<wordwithcount>() { @override public wordwithcount reduce(wordwithcount a, wordwithcount b) { return new wordwithcount(a.word, a.count + b.count); } });
这段逻辑中,对上面取到的datastreamsource数据流分别做了flatmap
、keyby
、timewindow
、reduce
四个转换操作,下面说一下flatmap
转换,其他三个转换操作读者可以试着自己查看源码理解一下。
先看一下flatmap
方法的源码吧,如下。
//代码目录:org/apache/flink/streaming/api/datastream/datastream.java public <r> singleoutputstreamoperator<r> flatmap(flatmapfunction<t, r> flatmapper) { typeinformation<r> outtype = typeextractor.getflatmapreturntypes(clean(flatmapper), gettype(), utils.getcalllocationname(), true); return transform("flat map", outtype, new streamflatmap<>(clean(flatmapper))); }
这里面做了两件事,一是用反射拿到了flatmap
算子的输出类型,二是生成了一个operator。flink流式计算的核心概念就是将数据从输入流一个个传递给operator进行链式处理,最后交给输出流的过程。对数据的每一次处理在逻辑上成为一个operator。上面代码中的最后一行transform
方法的作用是返回一个singleoutputstreamoperator
,它继承了datastream
类并且定义了一些辅助方法,方便对流的操作。在返回之前,transform
方法还把它注册到了执行环境中。下面这张图是一个由flink程序映射为streaming dataflow的示意图:
结果输出
windowcounts.print().setparallelism(1);
每个flink程序都是以source开始以sink结尾,这里的print
方法就是把计算出来的结果sink标准输出流。在实际开发中,一般会通过官网提供的各种connectors或者自定义的connectors把计算好的结果数据sink到指定的地方,比如kafka、hbase、filesystem、elasticsearch等等。这里的setparallelism
是设置此接收器的并行度的,值必须大于零。
执行程序
env.execute("socket window wordcount");
flink有远程模式和本地模式两种执行模式,这两种模式有一点不同,这里按本地模式来解析。先看下execute
方法的源码,如下:
//代码目录:org/apache/flink/streaming/api/environment/localstreamenvironment.java @override public jobexecutionresult execute(string jobname) throws exception { // transform the streaming program into a jobgraph streamgraph streamgraph = getstreamgraph(); streamgraph.setjobname(jobname); jobgraph jobgraph = streamgraph.getjobgraph(); jobgraph.setallowqueuedscheduling(true); configuration configuration = new configuration(); configuration.addall(jobgraph.getjobconfiguration()); configuration.setstring(taskmanageroptions.managed_memory_size, "0"); // add (and override) the settings with what the user defined configuration.addall(this.configuration); if (!configuration.contains(restoptions.bind_port)) { configuration.setstring(restoptions.bind_port, "0"); } int numslotspertaskmanager = configuration.getinteger(taskmanageroptions.num_task_slots, jobgraph.getmaximumparallelism()); miniclusterconfiguration cfg = new miniclusterconfiguration.builder() .setconfiguration(configuration) .setnumslotspertaskmanager(numslotspertaskmanager) .build(); if (log.isinfoenabled()) { log.info("running job on local embedded flink mini cluster"); } minicluster minicluster = new minicluster(cfg); try { minicluster.start(); configuration.setinteger(restoptions.port, minicluster.getrestaddress().get().getport()); return minicluster.executejobblocking(jobgraph); } finally { transformations.clear(); minicluster.close(); } }
这个方法包含三部分:将流程序转换为jobgraph、使用用户定义的内容添加(或覆盖)设置、启动一个minicluster并执行任务。关于jobgraph暂先不讲,这里就只说一下执行任务,跟进下return minicluster.executejobblocking(jobgraph);
这行的源码,如下:
//代码目录:org/apache/flink/runtime/minicluster/minicluster.java @override public jobexecutionresult executejobblocking(jobgraph job) throws jobexecutionexception, interruptedexception { checknotnull(job, "job is null"); final completablefuture<jobsubmissionresult> submissionfuture = submitjob(job); final completablefuture<jobresult> jobresultfuture = submissionfuture.thencompose( (jobsubmissionresult ignored) -> requestjobresult(job.getjobid())); final jobresult jobresult; try { jobresult = jobresultfuture.get(); } catch (executionexception e) { throw new jobexecutionexception(job.getjobid(), "could not retrieve jobresult.", exceptionutils.stripexecutionexception(e); } try { return jobresult.tojobexecutionresult(thread.currentthread().getcontextclassloader()); } catch (ioexception | classnotfoundexception e) { throw new jobexecutionexception(job.getjobid(), e); } }
这段代码的核心逻辑就是final completablefuture<jobsubmissionresult> submissionfuture = submitjob(job);
,调用了minicluster
类的submitjob
方法,接着看这个方法:
//代码目录:org/apache/flink/runtime/minicluster/minicluster.java public completablefuture<jobsubmissionresult> submitjob(jobgraph jobgraph) { final completablefuture<dispatchergateway> dispatchergatewayfuture = getdispatchergatewayfuture(); // we have to allow queued scheduling in flip-6 mode because we need to request slots // from the resourcemanager jobgraph.setallowqueuedscheduling(true); final completablefuture<inetsocketaddress> blobserveraddressfuture = createblobserveraddress(dispatchergatewayfuture); final completablefuture<void> jaruploadfuture = uploadandsetjobfiles(blobserveraddressfuture, jobgraph); final completablefuture<acknowledge> acknowledgecompletablefuture = jaruploadfuture .thencombine( dispatchergatewayfuture, (void ack, dispatchergateway dispatchergateway) -> dispatchergateway.submitjob(jobgraph, rpctimeout)) .thencompose(function.identity()); return acknowledgecompletablefuture.thenapply( (acknowledge ignored) -> new jobsubmissionresult(jobgraph.getjobid())); }
这里的dispatcher
组件负责接收作业提交,持久化它们,生成jobmanagers来执行作业并在主机故障时恢复它们。dispatcher
有两个实现,在本地环境下启动的是minidispatcher
,在集群环境上启动的是standalonedispatcher
。下面是类结构图:
这里的dispatcher
启动了一个jobmanagerrunner
,委托jobmanagerrunner
去启动该job的jobmaster
。对应的代码如下:
//代码目录:org/apache/flink/runtime/jobmaster/jobmanagerrunner.java private completablefuture<void> verifyjobschedulingstatusandstartjobmanager(uuid leadersessionid) { final completablefuture<jobschedulingstatus> jobschedulingstatusfuture = getjobschedulingstatus(); return jobschedulingstatusfuture.thencompose( jobschedulingstatus -> { if (jobschedulingstatus == jobschedulingstatus.done) { return jobalreadydone(); } else { return startjobmaster(leadersessionid); } }); }
jobmaster
经过一系列方法嵌套调用之后,最终执行到下面这段逻辑:
//代码目录:org/apache/flink/runtime/jobmaster/jobmaster.java private void scheduleexecutiongraph() { checkstate(jobstatuslistener == null); // register self as job status change listener jobstatuslistener = new jobmanagerjobstatuslistener(); executiongraph.registerjobstatuslistener(jobstatuslistener); try { executiongraph.scheduleforexecution(); } catch (throwable t) { executiongraph.failglobal(t); } }
这里executiongraph.scheduleforexecution();
调用了executiongraph
的启动方法。在flink的图结构中,executiongraph
是真正被执行的地方,所以到这里为止,一个任务从提交到真正执行的流程就结束了,下面再回顾一下本地环境下的执行流程:
- 客户端执行
execute
方法; -
minicluster
完成了大部分任务后把任务直接委派给minidispatcher
; -
dispatcher
接收job之后,会实例化一个jobmanagerrunner
,然后用这个实例启动job; -
jobmanagerrunner
接下来把job交给jobmaster
去处理; -
jobmaster
使用executiongraph
的方法启动整个执行图,整个任务就启动起来了。