Flink入门宝典(详细截图版)
本文基于java构建flink1.9版本入门程序,需要maven 3.0.4 和 java 8 以上版本。需要安装netcat进行简单调试。
这里简述安装过程,并使用idea进行开发一个简单流处理程序,本地调试或者提交到flink上运行,maven与jdk安装这里不做说明。
一、flink简介
flink诞生于欧洲的一个大数据研究项目stratosphere。该项目是柏林工业大学的一个研究性项目。早期,flink是做batch计算的,但是在2014年,stratosphere里面的核心成员孵化出flink,同年将flink捐赠apache,并在后来成为apache的*大数据项目,同时flink计算的主流方向被定位为streaming,即用流式计算来做所有大数据的计算,这就是flink技术诞生的背景。
2015开始阿里开始介入flink 负责对资源调度和流式sql的优化,成立了阿里内部版本blink在最近更新的1.9版本中,blink开始合并入flink,
未来flink也将支持java,scala,python等更多语言,并在机器学习领域施展拳脚。
二、flink开发环境搭建
首先要想运行flink,我们需要下载并解压flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html
我们可以选择flink与scala结合版本,这里我们选择最新的1.9版本apache flink 1.9.0 for scala 2.12进行下载。
flink在windows和linux下的安装与部署可以查看 flink快速入门--安装与示例运行,这里演示windows版。
安装成功后,启动cmd命令行窗口,进入flink文件夹,运行bin目录下的start-cluster.bat
$ cd flink $ cd bin $ start-cluster.bat starting a local cluster with one jobmanager process and one taskmanager process. you can terminate the processes via ctrl-c in the spawned shell windows. web interface by default on http://localhost:8081/.
显示启动成功后,我们在浏览器访问 http://localhost:8081/可以看到flink的管理页面。
三、flink快速体验
请保证安装好了flink,还需要maven 3.0.4 和 java 8 以上版本。这里简述maven构建过程。
其他详细构建方法欢迎查看:快速构建第一个flink工程
1、搭建maven工程
使用flink maven archetype构建一个工程。
$ mvn archetype:generate \ -darchetypegroupid=org.apache.flink \ -darchetypeartifactid=flink-quickstart-java \ -darchetypeversion=1.9.0
你可以编辑自己的artifactid groupid
目录结构如下:
$ tree quickstart/ quickstart/ ├── pom.xml └── src └── main ├── java │ └── org │ └── myorg │ └── quickstart │ ├── batchjob.java │ └── streamingjob.java └── resources └── log4j.properties
在pom中核心依赖:
<dependencies> <dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-java</artifactid> <version>${flink.version}</version> </dependency> <dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-streaming-java_2.11</artifactid> <version>${flink.version}</version> </dependency> <dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-clients_2.11</artifactid> <version>${flink.version}</version> </dependency> </dependencies>
2、编写代码
streamingjob
import org.apache.flink.api.common.functions.flatmapfunction; import org.apache.flink.api.java.tuple.tuple2; import org.apache.flink.streaming.api.datastream.datastream; import org.apache.flink.streaming.api.environment.streamexecutionenvironment; import org.apache.flink.streaming.api.windowing.time.time; import org.apache.flink.util.collector; public class streamingjob { public static void main(string[] args) throws exception { final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); datastream<tuple2<string, integer>> datastreaming = env .sockettextstream("localhost", 9999) .flatmap(new splitter()) .keyby(0) .timewindow(time.seconds(5)) .sum(1); datastreaming.print(); // execute program env.execute("flink streaming java api skeleton"); } public static class splitter implements flatmapfunction<string, tuple2<string, integer>> { @override public void flatmap(string sentence, collector<tuple2<string, integer>> out) throws exception { for(string word : sentence.split(" ")){ out.collect(new tuple2<string, integer>(word, 1)); } } } }
3、调试程序
安装netcat工具进行简单调试。
启动netcat 输入:
nc -l 9999
启动程序
在netcat中输入几个单词 逗号分隔
在程序一端查看结果
4、程序提交到flink
启动flink
windows为 start-cluster.bat linux为start-cluster.sh
localhost:8081查看管理页面
通过maven对代码打包
将打好的包提交到flink上
查看log
tail -f log/flink-***-jobmanager.out
在netcat中继续输入单词,在running jobs中查看作业状态,在log中查看输出。
四、flink 编程模型
flink提供不同级别的抽象来开发流/批处理应用程序。
最低级抽象只提供有状态流。
在实践中,大多数应用程序不需要上述低级抽象,而是针对core api编程, 如datastream api(有界/*流)和dataset api(有界数据集)。
table api声明了一个表,遵循关系模型。
*抽象是sql。
我们这里只用到了datastream api。
flink程序的基本构建块是流和转换。
一个程序的基本构成:
l 获取execution environment
l 加载/创建原始数据
l 指定这些数据的转化方法
l 指定计算结果的存放位置
l 触发程序执行
五、datastreaming api使用
1、获取execution environment
streamexecutionenvironment是所有flink程序的基础,获取方法有:
getexecutionenvironment()
createlocalenvironment()
createremoteenvironment(string host, int port, string ... jarfiles)
一般情况下使用getexecutionenvironment。如果你在ide或者常规java程序中执行可以通过createlocalenvironment创建基于本地机器的streamexecutionenvironment。如果你已经创建jar程序希望通过invoke方式获取里面的getexecutionenvironment方法可以使用createremoteenvironment方式。
2、加载/创建原始数据
streamexecutionenvironment提供的一些访问数据源的接口
(1)基于文件的数据源
readtextfile(path) readfile(fileinputformat, path) readfile(fileinputformat, path, watchtype, interval, pathfilter, typeinfo)
(2)基于socket的数据源(本文使用的)
l sockettextstream
(3)基于collection的数据源
fromcollection(collection) fromcollection(iterator, class) fromelements(t ...) fromparallelcollection(splittableiterator, class) generatesequence(from, to)
3、转化方法
(1)map方式:datastream -> datastream
功能:拿到一个element并输出一个element,类似hive中的udf函数
举例:
datastream<integer> datastream = //... datastream.map(new mapfunction<integer, integer>() { @override public integer map(integer value) throws exception { return 2 * value; } });
(2)flatmap方式:datastream -> datastream
功能:拿到一个element,输出多个值,类似hive中的udtf函数
举例:
datastream.flatmap(new flatmapfunction<string, string>() { @override public void flatmap(string value, collector<string> out) throws exception { for(string word: value.split(" ")){ out.collect(word); } } });
(3)filter方式:datastream -> datastream
功能:针对每个element判断函数是否返回true,最后只保留返回true的element
举例:
datastream.filter(new filterfunction<integer>() { @override public boolean filter(integer value) throws exception { return value != 0; } });
(4)keyby方式:datastream -> keyedstream
功能:逻辑上将流分割成不相交的分区,每个分区都是相同key的元素
举例:
datastream.keyby("somekey") // key by field "somekey" datastream.keyby(0) // key by the first element of a tuple
(5)reduce方式:keyedstream -> datastream
功能:在keyed data stream中进行轮训reduce。
举例:
keyedstream.reduce(new reducefunction<integer>() { @override public integer reduce(integer value1, integer value2) throws exception { return value1 + value2; } });
(6)aggregations方式:keyedstream -> datastream
功能:在keyed data stream中进行聚合操作
举例:
keyedstream.sum(0); keyedstream.sum("key"); keyedstream.min(0); keyedstream.min("key"); keyedstream.max(0); keyedstream.max("key"); keyedstream.minby(0); keyedstream.minby("key"); keyedstream.maxby(0); keyedstream.maxby("key");
(7)window方式:keyedstream -> windowedstream
功能:在keyedstream中进行使用,根据某个特征针对每个key用windows进行分组。
举例:
datastream.keyby(0).window(tumblingeventtimewindows.of(time.seconds(5))); // last 5 seconds of data
(8)windowall方式:datastream -> allwindowedstream
功能:在datastream中根据某个特征进行分组。
举例:
datastream.windowall(tumblingeventtimewindows.of(time.seconds(5))); // last 5 seconds of data
(9)union方式:datastream* -> datastream
功能:合并多个数据流成一个新的数据流
举例:
datastream.union(otherstream1, otherstream2, ...);
(10)split方式:datastream -> splitstream
功能:将流分割成多个流
举例:
splitstream<integer> split = somedatastream.split(new outputselector<integer>() { @override public iterable<string> select(integer value) { list<string> output = new arraylist<string>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });
(11)select方式:splitstream -> datastream
功能:从split stream中选择一个流
举例:
splitstream<integer> split; datastream<integer> even = split.select("even"); datastream<integer> odd = split.select("odd"); datastream<integer> all = split.select("even","odd");
4、输出数据
writeastext() writeascsv(...) print() / printtoerr() writeusingoutputformat() / fileoutputformat writetosocket addsink
更多flink相关原理:
更多实时计算,flink,kafka等相关技术博文,欢迎关注实时流式计算:
上一篇: 升级遇难题:IDE接口不够用怎么办?
下一篇: 大数据技术原理与运用知识