Flink入门(五)——DataSet Api编程指南
apache flink
apache flink 是一个兼顾高吞吐、低延迟、高性能的分布式处理框架。在实时计算崛起的今天,flink正在飞速发展。由于性能的优势和兼顾批处理,流处理的特性,flink可能正在颠覆整个大数据的生态。
dataset api
我们可以选择flink与scala结合版本,这里我们选择最新的1.9版本apache flink 1.9.0 for scala 2.12进行下载。
flink的编程模型,flink提供了不同的抽象级别以开发流式或者批处理应用,本文我们来介绍dataset api ,flink最常用的批处理编程模型。
public class wordcountexample { public static void main(string[] args) throws exception { final executionenvironment env = executionenvironment.getexecutionenvironment(); dataset<string> text = env.fromelements( "who's there?", "i think i hear them. stand, ho! who's there?"); dataset<tuple2<string, integer>> wordcounts = text .flatmap(new linesplitter()) .groupby(0) .sum(1); wordcounts.print(); } public static class linesplitter implements flatmapfunction<string, tuple2<string, integer>> { @override public void flatmap(string line, collector<tuple2<string, integer>> out) { for (string word : line.split(" ")) { out.collect(new tuple2<string, integer>(word, 1)); } } } }
import org.apache.flink.api.scala._ object wordcount { def main(args: array[string]) { val env = executionenvironment.getexecutionenvironment val text = env.fromelements( "who's there?", "i think i hear them. stand, ho! who's there?") val counts = text.flatmap { _.tolowercase.split("\\w+") filter { _.nonempty } } .map { (_, 1) } .groupby(0) .sum(1) counts.print() } }
dataset api 中最重要的就是这些算子,我们将数据接入后,通过这些算子对数据进行处理,得到我们想要的结果。
转换 | 描述 |
map | 采用一个数据元并生成一个数据元。data.map(new mapfunction<string, integer>() { public integer map(string value) { return integer.parseint(value); } });
flatmap | 采用一个数据元并生成零个,一个或多个数据元。data.flatmap(new flatmapfunction<string, string>() { public void flatmap(string value, collector<string> out) { for (string s : value.split(" ")) { out.collect(s); } } });
mappartition | 在单个函数调用中转换并行分区。该函数将分区作为iterable 流来获取,并且可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。data.mappartition(new mappartitionfunction<string, long>() { public void mappartition(iterable<string> values, collector<long> out) { long c = 0; for (string s : values) { c++; } out.collect(c); } });
filter | 计算每个数据元的布尔函数,并保存函数返回true的数据元。 重要信息:系统假定该函数不会修改应用谓词的数据元。违反此假设可能会导致错误的结果。data.filter(new filterfunction<integer>() { public boolean filter(integer value) { return value > 1000; } });
reduce | 通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。reduce可以应用于完整数据集或分组数据集。data.reduce(new reducefunction<integer> { public integer reduce(integer a, integer b) { return a + b; } }); 如果将reduce应用于分组数据集,则可以通过提供combinehint to 来指定运行时执行reduce的组合阶段的方式 setcombinehint 。在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如1/10)。 |
reducegroup | 将一组数据元组合成一个或多个数据元。reducegroup可以应用于完整数据集或分组数据集。data.reducegroup(new groupreducefunction<integer, integer> { public void reduce(iterable<integer> values, collector<integer> out) { int prefixsum = 0; for (integer i : values) { prefixsum += i; out.collect(prefixsum); } } });
aggregate | 将一组值聚合为单个值。聚合函数可以被认为是内置的reduce函数。聚合可以应用于完整数据集或分组数据集。dataset<tuple3<integer, string, double>> input = // [...] dataset<tuple3<integer, string, double>> output = input.aggregate(sum, 0).and(min, 2); 您还可以使用简写语法进行最小,最大和总和聚合。dataset<tuple3<integer, string, double>> input = // [...] dataset<tuple3<integer, string, double>> output = input.sum(0).andmin(2);
distinct | 返回数据集的不同数据元。它相对于数据元的所有字段或字段子集从输入dataset中删除重复条目。data.distinct(); 使用reduce函数实现distinct。您可以通过提供combinehint to 来指定运行时执行reduce的组合阶段的方式 setcombinehint 。在大多数情况下,基于散列的策略应该更快,特别是如果不同键的数量与输入数据元的数量相比较小(例如1/10)。 |
join | 通过创建在其键上相等的所有数据元对来连接两个数据集。可选地使用joinfunction将数据元对转换为单个数据元,或使用flatjoinfunction将数据元对转换为任意多个(包括无)数据元。请参阅以了解如何定义连接键。result = input1.join(input2) .where(0) // key of the first input (tuple field 0) .equalto(1); // key of the second input (tuple field 1) 您可以通过join hints指定运行时执行连接的方式。提示描述了通过分区或广播进行连接,以及它是使用基于排序还是基于散列的算法。有关可能的提示和示例的列表,请参阅“ 。 如果未指定提示,系统将尝试估算输入大小,并根据这些估计选择最佳策略。// this executes a join by broadcasting the first data set // using a hash table for the broadcast data result = input1.join(input2, joinhint.broadcast_hash_first) .where(0).equalto(1); 请注意,连接转换仅适用于等连接。其他连接类型需要使用outerjoin或cogroup表示。 |
outerjoin | 在两个数据集上执行左,右或全外连接。外连接类似于常规(内部)连接,并创建在其键上相等的所有数据元对。此外,如果在另一侧没有找到匹配的keys,则保存“外部”侧(左侧,右侧或两者都满)的记录。匹配数据元对(或一个数据元和null 另一个输入的值)被赋予joinfunction以将数据元对转换为单个数据元,或者转换为flatjoinfunction以将数据元对转换为任意多个(包括无)数据元。请参阅以了解如何定义连接键。input1.leftouterjoin(input2) // rightouterjoin or fullouterjoin for right or full outer joins .where(0) // key of the first input (tuple field 0) .equalto(1) // key of the second input (tuple field 1) .with(new joinfunction<string, string, string>() { public string join(string v1, string v2) { // note: // - v2 might be null for leftouterjoin // - v1 might be null for rightouterjoin // - v1 or v2 might be null for fullouterjoin } });
cogroup | reduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。请参阅以了解如何定义cogroup键。data1.cogroup(data2) .where(0) .equalto(1) .with(new cogroupfunction<string, string, string>() { public void cogroup(iterable<string> in1, iterable<string> in2, collector<string> out) { out.collect(...); } });
cross | 构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用crossfunction将数据元对转换为单个数据元dataset<integer> data1 = // [...] dataset<string> data2 = // [...] dataset<tuple2<integer, string>> result = data1.cross(data2); 注:交叉是一个潜在的非常计算密集型 算子操作它甚至可以挑战大的计算集群!建议使用crosswithtiny()和crosswithhuge()来提示系统的dataset大小。 |
union | 生成两个数据集的并集。dataset<string> data1 = // [...] dataset<string> data2 = // [...] dataset<string> result = data1.union(data2);
rebalance | 均匀地rebalance 数据集的并行分区以消除数据偏差。只有类似map的转换可能会遵循rebalance 转换。dataset<string> in = // [...] dataset<string> result = in.rebalance() .map(new mapper());
hash-partition | 散列分区给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。dataset<tuple2<string,integer>> in = // [...] dataset<integer> result = in.partitionbyhash(0) .mappartition(new partitionmapper());
range-partition | range-partition给定键上的数据集。键可以指定为位置键,表达键和键选择器函数。dataset<tuple2<string,integer>> in = // [...] dataset<integer> result = in.partitionbyrange(0) .mappartition(new partitionmapper());
custom partitioning | 手动指定数据分区。 注意:此方法仅适用于单个字段键。dataset<tuple2<string,integer>> in = // [...] dataset<integer> result = in.partitioncustom(partitioner<k> partitioner, key)
sort partition | 本地按指定顺序对指定字段上的数据集的所有分区进行排序。可以将字段指定为元组位置或字段表达式。通过链接sortpartition()调用来完成对多个字段的排序。dataset<tuple2<string,integer>> in = // [...] dataset<integer> result = in.sortpartition(1, order.ascending) .mappartition(new partitionmapper());
first-n | 返回数据集的前n个(任意)数据元。first-n可以应用于常规数据集,分组数据集或分组排序数据集。分组键可以指定为键选择器函数或字段位置键。dataset<tuple2<string,integer>> in = // [...] // regular data set dataset<tuple2<string,integer>> result1 = in.first(3); // grouped data set dataset<tuple2<string,integer>> result2 = in.groupby(0) .first(3); // grouped-sorted data set dataset<tuple2<string,integer>> result3 = in.groupby(0) .sortgroup(1, order.ascending) .first(3);
数据源创建初始数据集,例如来自文件或java集合。创建数据集的一般机制是在inputformat后面抽象的 。flink附带了几种内置格式,可以从通用文件格式创建数据集。他们中的许多人在executionenvironment上都有快捷方法。
- 按行读取文件并将其作为字符串返回。 -
- 按行读取文件并将它们作为stringvalues返回。stringvalues是可变字符串。 -
- 解析逗号(或其他字符)分隔字段的文件。返回元组或pojo的dataset。支持基本java类型及其value对应作为字段类型。 -
readfileofprimitives(path, class)
- 解析新行(或其他字符序列)分隔的原始数据类型(如string
。 -
readfileofprimitives(path, delimiter, class)
- 解析新行(或其他字符序列)分隔的原始数据类型的文件,例如string
使用给定的分隔符。 -
readsequencefile(key, value, path)
- 创建一个jobconf并从类型为sequencefileinputformat,key class和value类的指定路径中读取文件,并将它们作为tuple2 <key,value>返回。
- 从java java.util.collection创建数据集。集合中的所有数据元必须属于同一类型。 -
fromcollection(iterator, class)
- 从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。 -
fromelements(t ...)
- 根据给定的对象序列创建数据集。所有对象必须属于同一类型。 -
fromparallelcollection(splittableiterator, class)
- 并行地从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。 -
generatesequence(from, to)
- 并行生成给定间隔中的数字序列。
readfile(inputformat, path)
- 接受文件输入格式。 -
- 接受通用输入格式。
executionenvironment env = executionenvironment.getexecutionenvironment(); // read text file from local files system dataset<string> locallines = env.readtextfile("file:///path/to/my/textfile"); // read text file from a hdfs running at nnhost:nnport dataset<string> hdfslines = env.readtextfile("hdfs://nnhost:nnport/path/to/my/textfile"); // read a csv file with three fields dataset<tuple3<integer, string, double>> csvinput = env.readcsvfile("hdfs:///the/csv/file") .types(integer.class, string.class, double.class); // read a csv file with five fields, taking only two of them dataset<tuple2<string, double>> csvinput = env.readcsvfile("hdfs:///the/csv/file") .includefields("10010") // take the first and the fourth field .types(string.class, double.class); // read a csv file with three fields into a pojo (person.class) with corresponding fields dataset<person>> csvinput = env.readcsvfile("hdfs:///the/csv/file") .pojotype(person.class, "name", "age", "zipcode"); // read a file from the specified path of type sequencefileinputformat dataset<tuple2<intwritable, text>> tuples = env.readsequencefile(intwritable.class, text.class, "hdfs://nnhost:nnport/path/to/file"); // creates a set from some given elements dataset<string> value = env.fromelements("foo", "bar", "foobar", "fubar"); // generate a number sequence dataset<long> numbers = env.generatesequence(1, 10000000); // read data from a relational database using the jdbc input format dataset<tuple2<string, integer> dbdata = env.createinput( jdbcinputformat.buildjdbcinputformat() .setdrivername("org.apache.derby.jdbc.embeddeddriver") .setdburl("jdbc:derby:memory:persons") .setquery("select name, age from persons") .setrowtypeinfo(new rowtypeinfo(basictypeinfo.string_type_info, basictypeinfo.int_type_info)) .finish() ); // note: flink's program compiler needs to infer the data types of the data items which are returned // by an inputformat. if this information cannot be automatically inferred, it is necessary to // manually provide the type information as shown in the examples above.
final executionenvironment env = executionenvironment.createlocalenvironment(); // create a dataset from a list of elements dataset<integer> myints = env.fromelements(1, 2, 3, 4, 5); // create a dataset from any java collection list<tuple2<string, integer>> data = ... dataset<tuple2<string, integer>> mytuples = env.fromcollection(data); // create a dataset from an iterator iterator<long> longit = ... dataset<long> mylongs = env.fromcollection(longit, long.class);
除了常规的 算子操作输入之外,广播变量还允许您为 算子操作的所有并行实例提供数据集。这对于辅助数据集或与数据相关的参数化非常有用。然后,算子可以将数据集作为集合访问。
// 1. the dataset to be broadcast dataset<integer> tobroadcast = env.fromelements(1, 2, 3); dataset<string> data = env.fromelements("a", "b"); data.map(new richmapfunction<string, string>() { @override public void open(configuration parameters) throws exception { // 3. access the broadcast dataset as a collection collection<integer> broadcastset = getruntimecontext().getbroadcastvariable("broadcastsetname"); } @override public string map(string value) throws exception { ... } }).withbroadcastset(tobroadcast, "broadcastsetname"); // 2. broadcast the dataset
flink提供了一个分布式缓存,类似于apache hadoop,可以在本地访问用户函数的并行实例。此函数可用于共享包含静态外部数据的文件,如字典或机器学习的回归模型。
executionenvironment env = executionenvironment.getexecutionenvironment(); // register a file from hdfs env.registercachedfile("hdfs:///path/to/your/file", "hdfsfile") // register a local executable file (script, executable, ...) env.registercachedfile("file:///path/to/exec/file", "localexecfile", true) // define your program and execute ... dataset<string> input = ... dataset<integer> result = input.map(new mymapper()); ... env.execute();
以上就是dataset api 的使用,其实和spark非常的相似,我们将数据接入后,可以利用各种算子对数据进行处理。
