欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Flink入门宝典(详细截图版)

程序员文章站 2024-01-19 19:10:58
本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。 这里简述安装过程,并使用IDEA进行开发一个简单流处理程序,本地调试或者提交到Flink上运行,Maven与JDK安装这里不做说明。 一、Flink简介 Fl ......

Flink入门宝典(详细截图版)
本文基于java构建flink1.9版本入门程序,需要maven 3.0.4 和 java 8 以上版本。需要安装netcat进行简单调试。

这里简述安装过程,并使用idea进行开发一个简单流处理程序,本地调试或者提交到flink上运行,maven与jdk安装这里不做说明。

一、flink简介

Flink入门宝典(详细截图版)

flink诞生于欧洲的一个大数据研究项目stratosphere。该项目是柏林工业大学的一个研究性项目。早期,flink是做batch计算的,但是在2014年,stratosphere里面的核心成员孵化出flink,同年将flink捐赠apache,并在后来成为apache的*大数据项目,同时flink计算的主流方向被定位为streaming,即用流式计算来做所有大数据的计算,这就是flink技术诞生的背景。

2015开始阿里开始介入flink 负责对资源调度和流式sql的优化,成立了阿里内部版本blink在最近更新的1.9版本中,blink开始合并入flink,

未来flink也将支持java,scala,python等更多语言,并在机器学习领域施展拳脚。

二、flink开发环境搭建

首先要想运行flink,我们需要下载并解压flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html

我们可以选择flink与scala结合版本,这里我们选择最新的1.9版本apache flink 1.9.0 for scala 2.12进行下载。

Flink入门宝典(详细截图版)

flink在windows和linux下的安装与部署可以查看 flink快速入门--安装与示例运行,这里演示windows版。

安装成功后,启动cmd命令行窗口,进入flink文件夹,运行bin目录下的start-cluster.bat

$ cd flink
$ cd bin
$ start-cluster.bat
starting a local cluster with one jobmanager process and one taskmanager process.
you can terminate the processes via ctrl-c in the spawned shell windows.
web interface by default on http://localhost:8081/.

显示启动成功后,我们在浏览器访问 http://localhost:8081/可以看到flink的管理页面。

Flink入门宝典(详细截图版)

三、flink快速体验

请保证安装好了flink,还需要maven 3.0.4 和 java 8 以上版本。这里简述maven构建过程。

其他详细构建方法欢迎查看:快速构建第一个flink工程

1、搭建maven工程

使用flink maven archetype构建一个工程。

 $ mvn archetype:generate                               \
      -darchetypegroupid=org.apache.flink              \
      -darchetypeartifactid=flink-quickstart-java      \
      -darchetypeversion=1.9.0

你可以编辑自己的artifactid groupid

目录结构如下:

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── batchjob.java
        │               └── streamingjob.java
        └── resources
            └── log4j.properties

在pom中核心依赖:

<dependencies>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-java</artifactid>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-streaming-java_2.11</artifactid>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupid>org.apache.flink</groupid>
        <artifactid>flink-clients_2.11</artifactid>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2、编写代码

streamingjob

import org.apache.flink.api.common.functions.flatmapfunction;
import org.apache.flink.api.java.tuple.tuple2;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.api.windowing.time.time;
import org.apache.flink.util.collector;
public class streamingjob {

    public static void main(string[] args) throws exception {
        final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
        datastream<tuple2<string, integer>> datastreaming = env
                .sockettextstream("localhost", 9999)
                .flatmap(new splitter())
                .keyby(0)
                .timewindow(time.seconds(5))
                .sum(1);

        datastreaming.print();

        // execute program
        env.execute("flink streaming java api skeleton");
    }
    public static class splitter implements flatmapfunction<string, tuple2<string, integer>> {

        @override
        public void flatmap(string sentence, collector<tuple2<string, integer>> out) throws exception {
            for(string word : sentence.split(" ")){
                out.collect(new tuple2<string, integer>(word, 1));
            }
        }

    }
}

3、调试程序

安装netcat工具进行简单调试。

启动netcat 输入:

nc -l 9999

启动程序

Flink入门宝典(详细截图版)

在netcat中输入几个单词 逗号分隔

Flink入门宝典(详细截图版)

在程序一端查看结果

Flink入门宝典(详细截图版)

启动flink

windows为 start-cluster.bat    linux为start-cluster.sh

localhost:8081查看管理页面

Flink入门宝典(详细截图版)

通过maven对代码打包

Flink入门宝典(详细截图版)

将打好的包提交到flink上

Flink入门宝典(详细截图版)

查看log

tail -f log/flink-***-jobmanager.out

在netcat中继续输入单词,在running jobs中查看作业状态,在log中查看输出。

Flink入门宝典(详细截图版)

flink提供不同级别的抽象来开发流/批处理应用程序。

Flink入门宝典(详细截图版)

最低级抽象只提供有状态流

在实践中,大多数应用程序不需要上述低级抽象,而是针对core api编程, 如datastream api(有界/*流)和dataset api(有界数据集)。

table api声明了一个表,遵循关系模型。

*抽象是sql

我们这里只用到了datastream api。

flink程序的基本构建块是转换

一个程序的基本构成:

l 获取execution environment

l 加载/创建原始数据

l 指定这些数据的转化方法

l 指定计算结果的存放位置

l 触发程序执行

Flink入门宝典(详细截图版)

五、datastreaming api使用

1、获取execution environment

streamexecutionenvironment是所有flink程序的基础,获取方法有:

getexecutionenvironment()

createlocalenvironment()

createremoteenvironment(string host, int port, string ... jarfiles)

一般情况下使用getexecutionenvironment。如果你在ide或者常规java程序中执行可以通过createlocalenvironment创建基于本地机器的streamexecutionenvironment。如果你已经创建jar程序希望通过invoke方式获取里面的getexecutionenvironment方法可以使用createremoteenvironment方式。

2、加载/创建原始数据

streamexecutionenvironment提供的一些访问数据源的接口

(1)基于文件的数据源

readtextfile(path)
readfile(fileinputformat, path)
readfile(fileinputformat, path, watchtype, interval, pathfilter, typeinfo)

(2)基于socket的数据源(本文使用的)

sockettextstream

 

(3)基于collection的数据源

fromcollection(collection)
fromcollection(iterator, class)
fromelements(t ...)
fromparallelcollection(splittableiterator, class)
generatesequence(from, to)

3、转化方法

(1)map方式:datastream -> datastream

功能:拿到一个element并输出一个element,类似hive中的udf函数

举例:

datastream<integer> datastream = //...
datastream.map(new mapfunction<integer, integer>() {
    @override
    public integer map(integer value) throws exception {
        return 2 * value;
    }
});

(2)flatmap方式:datastream -> datastream

功能:拿到一个element,输出多个值,类似hive中的udtf函数

举例:

datastream.flatmap(new flatmapfunction<string, string>() {
    @override
    public void flatmap(string value, collector<string> out)
        throws exception {
        for(string word: value.split(" ")){
            out.collect(word);
        }
    }
});

(3)filter方式:datastream -> datastream

功能:针对每个element判断函数是否返回true,最后只保留返回true的element

举例:

datastream.filter(new filterfunction<integer>() {
    @override
    public boolean filter(integer value) throws exception {
        return value != 0;
    }
});

(4)keyby方式:datastream -> keyedstream

功能:逻辑上将流分割成不相交的分区,每个分区都是相同key的元素

举例:

datastream.keyby("somekey") // key by field "somekey"
datastream.keyby(0) // key by the first element of a tuple

(5)reduce方式:keyedstream -> datastream

功能:在keyed data stream中进行轮训reduce。

举例:

keyedstream.reduce(new reducefunction<integer>() {
    @override
    public integer reduce(integer value1, integer value2)
    throws exception {
        return value1 + value2;
    }
});

(6)aggregations方式:keyedstream -> datastream

功能:在keyed data stream中进行聚合操作

举例:

keyedstream.sum(0);
keyedstream.sum("key");
keyedstream.min(0);
keyedstream.min("key");
keyedstream.max(0);
keyedstream.max("key");
keyedstream.minby(0);
keyedstream.minby("key");
keyedstream.maxby(0);
keyedstream.maxby("key");

(7)window方式:keyedstream -> windowedstream

功能:在keyedstream中进行使用,根据某个特征针对每个key用windows进行分组。

举例:

datastream.keyby(0).window(tumblingeventtimewindows.of(time.seconds(5))); // last 5 seconds of data

(8)windowall方式:datastream -> allwindowedstream

功能:在datastream中根据某个特征进行分组。

举例:

datastream.windowall(tumblingeventtimewindows.of(time.seconds(5))); // last 5 seconds of data

(9)union方式:datastream* -> datastream

功能:合并多个数据流成一个新的数据流

举例:

datastream.union(otherstream1, otherstream2, ...);

(10)split方式:datastream -> splitstream

功能:将流分割成多个流

举例:

splitstream<integer> split = somedatastream.split(new outputselector<integer>() {
    @override
    public iterable<string> select(integer value) {
        list<string> output = new arraylist<string>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

(11)select方式:splitstream -> datastream

功能:从split stream中选择一个流

举例:

splitstream<integer> split;
datastream<integer> even = split.select("even");
datastream<integer> odd = split.select("odd");
datastream<integer> all = split.select("even","odd");

4、输出数据

writeastext()
writeascsv(...)
print() / printtoerr() 
writeusingoutputformat() / fileoutputformat
writetosocket
addsink

更多flink相关原理:

穿梭时空的实时计算框架——flink对时间的处理

大数据实时处理的王者-flink

统一批处理流处理——flink批流一体实现原理

flink快速入门--安装与示例运行

快速构建第一个flink工程

更多实时计算,flink,kafka等相关技术博文,欢迎关注实时流式计算:

Flink入门宝典(详细截图版)