欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Flink源码分析 - 剖析一个简单的Flink程序

程序员文章站 2022-07-10 12:43:21
本篇文章首发于头条号 "Flink程序是如何执行的?通过源码来剖析一个简单的Flink程序" ,欢迎关注 "头条号" 和微信公众号“大数据技术和人工智能”(微信搜索bigdata_ai_tech)获取更多干货,也欢迎关注我的 "CSDN博客" 。 在这之前已经介绍了 "如何在本地搭建Flink环境和 ......

本篇文章首发于头条号flink程序是如何执行的?通过源码来剖析一个简单的flink程序,欢迎关注和微信公众号“大数据技术和人工智能”(微信搜索bigdata_ai_tech)获取更多干货,也欢迎关注我的csdn博客

在这之前已经介绍了如何在本地搭建flink环境和如何创建flink应用如何构建flink源码,这篇文章用官方提供的socketwindowwordcount例子来解析一下一个常规flink程序的每一个基本步骤。

示例程序

public class socketwindowwordcount {
    public static void main(string[] args) throws exception {
        // the host and the port to connect to
        final string hostname;
        final int port;
        try {
            final parametertool params = parametertool.fromargs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getint("port");
        } catch (exception e) {
            system.err.println("no port specified. please run 'socketwindowwordcount " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");
            system.err.println("to start a simple text server, run 'netcat -l <port>' and " +
                    "type the input text into the command line");
            return;
        }
        // get the execution environment
        final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        // get input data by connecting to the socket
        datastream<string> text = env.sockettextstream(hostname, port, "\n");
        // parse the data, group it, window it, and aggregate the counts
        datastream<wordwithcount> windowcounts = text
                .flatmap(new flatmapfunction<string, wordwithcount>() {
                    @override
                    public void flatmap(string value, collector<wordwithcount> out) {
                        for (string word : value.split("\\s")) {
                            out.collect(new wordwithcount(word, 1l));
                        }
                    }
                })
                .keyby("word")
                .timewindow(time.seconds(5))

                .reduce(new reducefunction<wordwithcount>() {
                    @override
                    public wordwithcount reduce(wordwithcount a, wordwithcount b) {
                        return new wordwithcount(a.word, a.count + b.count);
                    }
                });
        // print the results with a single thread, rather than in parallel
        windowcounts.print().setparallelism(1);
        env.execute("socket window wordcount");
    }
    // ------------------------------------------------------------------------
    /**
     * data type for words with count.
     */
    public static class wordwithcount {
        public string word;
        public long count;
        public wordwithcount() {}
        public wordwithcount(string word, long count) {
            this.word = word;
            this.count = count;
        }
        @override
        public string tostring() {
            return word + " : " + count;
        }
    }
}

上面这个是官网的socketwindowwordcount程序示例,它首先从命令行中获取socket连接的host和port,然后获取执行环境、从socket连接中读取数据、解析和转换数据,最后输出结果数据。
每个flink程序都包含以下几个相同的基本部分:

  1. 获得一个execution environment,
  2. 加载/创建初始数据,
  3. 指定此数据的转换,
  4. 指定放置计算结果的位置,
  5. 触发程序执行

flink执行环境

final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

flink程序都是从这句代码开始,这行代码会返回一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回一个由createlocalenvironment()创建的本地执行环境localstreamenvironment。从其源码里可以看出来:

//代码目录:org/apache/flink/streaming/api/environment/streamexecutionenvironment.java
public static streamexecutionenvironment getexecutionenvironment() {
    if (contextenvironmentfactory != null) {
        return contextenvironmentfactory.createexecutionenvironment();
    }
    executionenvironment env = executionenvironment.getexecutionenvironment();
    if (env instanceof contextenvironment) {
        return new streamcontextenvironment((contextenvironment) env);
    } else if (env instanceof optimizerplanenvironment || env instanceof previewplanenvironment) {
        return new streamplanenvironment(env);
    } else {
        return createlocalenvironment();
    }
}

获取输入数据

datastream<string> text = env.sockettextstream(hostname, port, "\n");

这个例子里的源数据来自于socket,这里会根据指定的socket配置创建socket连接,然后创建一个新数据流,包含从套接字无限接收的字符串,接收的字符串由系统的默认字符集解码。当socket连接关闭时,数据读取会立即终止。通过查看源码可以发现,这里实际上是通过指定的socket配置来构造一个sockettextstreamfunction实例,然后源源不断的从socket连接里读取输入的数据创建数据流。

//代码目录:org/apache/flink/streaming/api/environment/streamexecutionenvironment.java
@publicevolving
public datastreamsource<string> sockettextstream(string hostname, int port, string delimiter, long maxretry) {
    return addsource(new sockettextstreamfunction(hostname, port, delimiter, maxretry),
            "socket stream");
}

sockettextstreamfunction的类继承关系如下:
Flink源码分析 - 剖析一个简单的Flink程序

可以看出sockettextstreamfunctionsourcefunction的子类,sourcefunction是flink中所有流数据源的基本接口。sourcefunction的定义如下:

//代码目录:org/apache/flink/streaming/api/functions/source/sourcefunction.java
@public
public interface sourcefunction<t> extends function, serializable {
    void run(sourcecontext<t> ctx) throws exception;
    void cancel();
    @public
    interface sourcecontext<t> {
        void collect(t element);
        @publicevolving
        void collectwithtimestamp(t element, long timestamp);
        @publicevolving
        void emitwatermark(watermark mark);
        @publicevolving
        void markastemporarilyidle();
        object getcheckpointlock();
        void close();
    }
}

sourcefunction定义了runcancel两个方法和sourcecontext内部接口。

  • run(sourcecontex):实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发。
  • cancel():用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止。
  • sourcecontext:source函数用于发出元素和可能的watermark的接口,返回source生成的元素的类型。

了解了sourcefunction这个接口,再来看下sockettextstreamfunction的具体实现(主要是run方法),逻辑就已经很清晰了,就是从指定的hostname和port持续不断的读取数据,按回车换行分隔符划分成一个个字符串,然后再将数据转发到下游。现在回到streamexecutionenvironmentsockettextstream方法,它通过调用addsource返回一个datastreamsource实例。思考一下,例子里的text变量是datastream类型,为什么源码里的返回类型却是datastreamsource呢?这是因为datastreamdatastreamsource的父类,下面的类关系图可以看出来,这也体现出了java的多态的特性。
Flink源码分析 - 剖析一个简单的Flink程序

数据流操作

对上面取到的datastreamsource,进行flatmapkeybytimewindowreduce转换操作。

datastream<wordwithcount> windowcounts = text
        .flatmap(new flatmapfunction<string, wordwithcount>() {
            @override
            public void flatmap(string value, collector<wordwithcount> out) {
                for (string word : value.split("\\s")) {
                    out.collect(new wordwithcount(word, 1l));
                }
            }
        })
        .keyby("word")
        .timewindow(time.seconds(5))
        .reduce(new reducefunction<wordwithcount>() {
            @override
            public wordwithcount reduce(wordwithcount a, wordwithcount b) {
                return new wordwithcount(a.word, a.count + b.count);
            }
        });

这段逻辑中,对上面取到的datastreamsource数据流分别做了flatmapkeybytimewindowreduce四个转换操作,下面说一下flatmap转换,其他三个转换操作读者可以试着自己查看源码理解一下。

先看一下flatmap方法的源码吧,如下。

//代码目录:org/apache/flink/streaming/api/datastream/datastream.java
public <r> singleoutputstreamoperator<r> flatmap(flatmapfunction<t, r> flatmapper) {
    typeinformation<r> outtype = typeextractor.getflatmapreturntypes(clean(flatmapper),
            gettype(), utils.getcalllocationname(), true);
    return transform("flat map", outtype, new streamflatmap<>(clean(flatmapper)));
}

这里面做了两件事,一是用反射拿到了flatmap算子的输出类型,二是生成了一个operator。flink流式计算的核心概念就是将数据从输入流一个个传递给operator进行链式处理,最后交给输出流的过程。对数据的每一次处理在逻辑上成为一个operator。上面代码中的最后一行transform方法的作用是返回一个singleoutputstreamoperator,它继承了datastream类并且定义了一些辅助方法,方便对流的操作。在返回之前,transform方法还把它注册到了执行环境中。下面这张图是一个由flink程序映射为streaming dataflow的示意图:
Flink源码分析 - 剖析一个简单的Flink程序

结果输出

windowcounts.print().setparallelism(1);

每个flink程序都是以source开始以sink结尾,这里的print方法就是把计算出来的结果sink标准输出流。在实际开发中,一般会通过官网提供的各种connectors或者自定义的connectors把计算好的结果数据sink到指定的地方,比如kafka、hbase、filesystem、elasticsearch等等。这里的setparallelism是设置此接收器的并行度的,值必须大于零。

执行程序

env.execute("socket window wordcount");

flink有远程模式和本地模式两种执行模式,这两种模式有一点不同,这里按本地模式来解析。先看下execute方法的源码,如下:

//代码目录:org/apache/flink/streaming/api/environment/localstreamenvironment.java
@override
public jobexecutionresult execute(string jobname) throws exception {
    // transform the streaming program into a jobgraph
    streamgraph streamgraph = getstreamgraph();
    streamgraph.setjobname(jobname);
    jobgraph jobgraph = streamgraph.getjobgraph();
    jobgraph.setallowqueuedscheduling(true);
    configuration configuration = new configuration();
    configuration.addall(jobgraph.getjobconfiguration());
    configuration.setstring(taskmanageroptions.managed_memory_size, "0");
    // add (and override) the settings with what the user defined
    configuration.addall(this.configuration);
    if (!configuration.contains(restoptions.bind_port)) {
        configuration.setstring(restoptions.bind_port, "0");
    }
    int numslotspertaskmanager = configuration.getinteger(taskmanageroptions.num_task_slots, jobgraph.getmaximumparallelism());
    miniclusterconfiguration cfg = new miniclusterconfiguration.builder()
        .setconfiguration(configuration)
        .setnumslotspertaskmanager(numslotspertaskmanager)
        .build();
    if (log.isinfoenabled()) {
        log.info("running job on local embedded flink mini cluster");
    }
    minicluster minicluster = new minicluster(cfg);
    try {
        minicluster.start();
        configuration.setinteger(restoptions.port, minicluster.getrestaddress().get().getport());
        return minicluster.executejobblocking(jobgraph);
    }
    finally {
        transformations.clear();
        minicluster.close();
    }
}

这个方法包含三部分:将流程序转换为jobgraph、使用用户定义的内容添加(或覆盖)设置、启动一个minicluster并执行任务。关于jobgraph暂先不讲,这里就只说一下执行任务,跟进下return minicluster.executejobblocking(jobgraph);这行的源码,如下:

//代码目录:org/apache/flink/runtime/minicluster/minicluster.java
@override
public jobexecutionresult executejobblocking(jobgraph job) throws jobexecutionexception, interruptedexception {
    checknotnull(job, "job is null");
    final completablefuture<jobsubmissionresult> submissionfuture = submitjob(job);
    final completablefuture<jobresult> jobresultfuture = submissionfuture.thencompose(
        (jobsubmissionresult ignored) -> requestjobresult(job.getjobid()));
    final jobresult jobresult;
    try {
        jobresult = jobresultfuture.get();
    } catch (executionexception e) {
        throw new jobexecutionexception(job.getjobid(), "could not retrieve jobresult.", exceptionutils.stripexecutionexception(e);
    }
    try {
        return jobresult.tojobexecutionresult(thread.currentthread().getcontextclassloader());
    } catch (ioexception | classnotfoundexception e) {
        throw new jobexecutionexception(job.getjobid(), e);
    }
}

这段代码的核心逻辑就是final completablefuture<jobsubmissionresult> submissionfuture = submitjob(job);,调用了minicluster类的submitjob方法,接着看这个方法:

//代码目录:org/apache/flink/runtime/minicluster/minicluster.java
public completablefuture<jobsubmissionresult> submitjob(jobgraph jobgraph) {
    final completablefuture<dispatchergateway> dispatchergatewayfuture = getdispatchergatewayfuture();
    // we have to allow queued scheduling in flip-6 mode because we need to request slots
    // from the resourcemanager
    jobgraph.setallowqueuedscheduling(true);
    final completablefuture<inetsocketaddress> blobserveraddressfuture = createblobserveraddress(dispatchergatewayfuture);
    final completablefuture<void> jaruploadfuture = uploadandsetjobfiles(blobserveraddressfuture, jobgraph);
    final completablefuture<acknowledge> acknowledgecompletablefuture = jaruploadfuture
        .thencombine(
            dispatchergatewayfuture,
            (void ack, dispatchergateway dispatchergateway) -> dispatchergateway.submitjob(jobgraph, rpctimeout))
        .thencompose(function.identity());
    return acknowledgecompletablefuture.thenapply(
        (acknowledge ignored) -> new jobsubmissionresult(jobgraph.getjobid()));
}

这里的dispatcher组件负责接收作业提交,持久化它们,生成jobmanagers来执行作业并在主机故障时恢复它们。dispatcher有两个实现,在本地环境下启动的是minidispatcher,在集群环境上启动的是standalonedispatcher。下面是类结构图:
Flink源码分析 - 剖析一个简单的Flink程序

这里的dispatcher启动了一个jobmanagerrunner,委托jobmanagerrunner去启动该job的jobmaster。对应的代码如下:

//代码目录:org/apache/flink/runtime/jobmaster/jobmanagerrunner.java
private completablefuture<void> verifyjobschedulingstatusandstartjobmanager(uuid leadersessionid) {
    final completablefuture<jobschedulingstatus> jobschedulingstatusfuture = getjobschedulingstatus();
    return jobschedulingstatusfuture.thencompose(
        jobschedulingstatus -> {
            if (jobschedulingstatus == jobschedulingstatus.done) {
                return jobalreadydone();
            } else {
                return startjobmaster(leadersessionid);
            }
        });
}

jobmaster经过一系列方法嵌套调用之后,最终执行到下面这段逻辑:

//代码目录:org/apache/flink/runtime/jobmaster/jobmaster.java
private void scheduleexecutiongraph() {
    checkstate(jobstatuslistener == null);
    // register self as job status change listener
    jobstatuslistener = new jobmanagerjobstatuslistener();
    executiongraph.registerjobstatuslistener(jobstatuslistener);
    try {
        executiongraph.scheduleforexecution();
    }
    catch (throwable t) {
        executiongraph.failglobal(t);
    }
}

这里executiongraph.scheduleforexecution();调用了executiongraph的启动方法。在flink的图结构中,executiongraph是真正被执行的地方,所以到这里为止,一个任务从提交到真正执行的流程就结束了,下面再回顾一下本地环境下的执行流程:

  1. 客户端执行execute方法;
  2. minicluster完成了大部分任务后把任务直接委派给minidispatcher
  3. dispatcher接收job之后,会实例化一个jobmanagerrunner,然后用这个实例启动job;
  4. jobmanagerrunner接下来把job交给jobmaster去处理;
  5. jobmaster使用executiongraph的方法启动整个执行图,整个任务就启动起来了。