Flink 1.8 Basic API Concepts 基本API概念
程序员文章站
2022-05-18 21:01:17
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/api_concepts.html DataSet and DataStream Flink具有特殊类DataSet和DataStream来表示程序中的数据。 你可以 ......
参考:
dataset and datastream
flink具有特殊类dataset和datastream来表示程序中的数据。 你可以将它们视为可以包含重复项的不可变数据集合。
anatomy of a flink program flink程序的剖析
flink程序看起来像是转换数据集合的常规程序。 每个程序包含相同的基本部分:
- 获得执行环境, obtain an execution environment,
- 加载/创建初始数据, load/create the initial data,
- 指定此数据的转换, specify transformations on this data,
- 指定放置计算结果的位置,specify where to put the results of your computations,
- 触发程序执行 trigger the program execution
streamexecutionenvironment是所有flink程序的基础。你可以在streamexecutionenvironment上使用这些静态方法获取一个:
getexecutionenvironment()
createlocalenvironment()
createremoteenvironment(string host, int port, string... jarfiles)
通常,你只需要使用getexecutionenvironment(),因为这将根据上下文执行正确的操作:如果你在ide中执行程序或作为常规java程序,它将创建一个本地环境,将执行你的程序 你的本地机器。 如果你从程序中创建了一个jar文件,并通过命令行调用它,则flink集群管理器将执行你的main方法,getexecutionenvironment()将返回一个执行环境,用于在集群上执行你的程序。
对于指定数据源,执行环境有几种方法可以使用各种方法从文件中读取:你可以逐行读取它们,csv文件或使用完全自定义数据输入格式。 要将文本文件作为一系列行读取,你可以使用:
final streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment(); datastream<string> text = env.readtextfile("file:///path/to/file");
这将为你提供一个datastream,然后你可以在其上应用转换来创建新的派生datastream。
你可以通过使用转换函数调用datastream上的方法来应用转换。 例如,map转换如下所示:
datastream<string> input = ...; datastream<integer> parsed = input.map(new mapfunction<string, integer>() { @override public integer map(string value) { return integer.parseint(value); } });
这将通过将原始集合中的每个string转换为integer来创建新的datastream。
一旦有了包含最终结果的datastream,就可以通过创建接收器(sink)将其写入外部系统。 这些只是创建接收器的一些示例方法:
writeastext(string path) print()
一旦指定了完整的程序,就需要通过调用streamexecutionenvironment上的execute()来触发程序执行。 根据executionenvironment的类型,将在本地计算机上触发执行或提交程序以在群集上执行。
execute()方法返回一个jobexecutionresult,它包含执行时间和累加器结果。
lazy evaluation 惰性求值
所有flink程序都是惰性执行:当执行程序的main方法时,数据加载和转换不会直接发生。 而是创建每个操作并将其添加到程序的计划中。 当执行环境上的execute()调用显式触发执行时,实际执行操作。 程序是在本地执行还是在集群上执行取决于执行环境的类型。
惰性求值使你可以构建flink作为一个整体计划单元执行的复杂程序。
specifying keys 指定键
一些转换(join,cogroup,keyby,groupby)要求在元素集合上定义键。 其他转换(reduce,groupreduce,aggregate,windows)允许数据在应用之前在键上分组。
dataset分组:
dataset<...> input = // [...] dataset<...> reduced = input .groupby(/*define key here*/) .reducegroup(/*do something*/);
datastream设置键:
datastream<...> input = // [...] datastream<...> windowed = input .keyby(/*define key here*/) .window(/*window specification*/);
flink的数据模型不基于键值对。 因此,你无需将数据集类型物理打包到键和值中。 键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组运算符。
define keys for tuples 定义元组的键
最简单的情况是在元组的一个或多个字段上对元组进行分组:
datastream<tuple3<integer,string,long>> input = // [...] keyedstream<tuple3<integer,string,long>,tuple> keyed = input.keyby(0)
元组在第一个字段(整数类型)上分组。
datastream<tuple3<integer,string,long>> input = // [...] keyedstream<tuple3<integer,string,long>,tuple> keyed = input.keyby(0,1)
在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。
注意嵌套元组:如果你有一个带有嵌套元组的datastream,例如:
datastream<tuple3<tuple2<integer, float>,string,long>> ds;
指定keyby(0)将使系统使用完整的tuple2作为键(以integer和float为键)。 如果要“导航”到嵌套的tuple2中,则必须使用下面解释的字段表达式键。
define keys using field expressions 使用字段表达式定义键
你可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于grouping, sorting, joining或cogrouping的键。
字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如tuple和pojo类型。
在下面的示例中,我们有一个wc pojo,其中包含两个字段“word”和“count”。 要按字段分组,我们只需将其名称传递给keyby()函数。
// some ordinary pojo (plain old java object) public class wc { public string word; public int count; } datastream<wc> words = // [...] datastream<wc> wordcounts = words.keyby("word").window(/*window specification*/);
字段表达式语法:
- 按字段名称选择pojo字段。 例如,“user”指的是pojo类型的“user”字段。
- 按字段名称或0偏移字段索引选择元组字段。 例如,“f0”和“5”分别表示java元组类型的第一和第六字段。
- 你可以在pojo和tuples中选择嵌套字段。 例如,“user.zip”指的是pojo的“zip”字段,其存储在pojo类型的“user”字段中。 支持任意嵌套和混合pojo和元组,例如“f1.user.zip”或“user.f3.1.zip”。
- 你可以使用“*”通配符表达式选择完整类型。 这也适用于非tuple或pojo类型的类型。
字段表达式样例:
public static class wc { public complexnestedclass complex; //nested pojo private int count; // getter / setter for private field (count) public int getcount() { return count; } public void setcount(int c) { this.count = c; } } public static class complexnestedclass { public integer somenumber; public float somefloat; public tuple3<long, long, string> word; public intwritable hadoopcitizen; }
这些是上面示例代码的有效字段表达式:
- “count”:wc类中的count字段。
- “complex”:递归选择pojo类型complexnestedclass的字段复合体的所有字段。
- “complex.word.f2”:选择嵌套tuple3的最后一个字段。
- “complex.hadoopcitizen”:选择hadoop intwritable类型。
define keys using key selector functions 使用键选择器函数定义键
定义键的另一种方法是“键选择器”函数。 键选择器函数将单个元素作为输入并返回元素的键。 键可以是任何类型,并且可以从确定性计算中输出。
以下示例显示了一个键选择器函数,它只返回一个对象的字段:
// some ordinary pojo public class wc {public string word; public int count;} datastream<wc> words = // [...] keyedstream<wc> keyed = words .keyby(new keyselector<wc, string>() { public string getkey(wc wc) { return wc.word; } });
specifying transformation functions 指定转换函数
大多数转换都需要用户定义的函数。 本节列出了如何指定它们的不同方法。
implementing an interface 实现接口
最基本的方法是实现一个提供的接口:
class mymapfunction implements mapfunction<string, integer> { public integer map(string value) { return integer.parseint(value); } }; data.map(new mymapfunction());
anonymous classes 匿名类
你可以将函数作为匿名类传递:
data.map(new mapfunction<string, integer> () { public integer map(string value) { return integer.parseint(value); } });
java 8 lambdas 表达式
flink还支持java api中的java 8 lambdas。
data.filter(s -> s.startswith("http://")); data.reduce((i1,i2) -> i1 + i2);
rich functions 富函数
所有需要用户定义函数的转换都可以将富函数作为参数。 例如,替换
class mymapfunction implements mapfunction<string, integer> { public integer map(string value) { return integer.parseint(value); } };
你可以写
class mymapfunction extends richmapfunction<string, integer> { public integer map(string value) { return integer.parseint(value); } };
并像往常一样将函数传递给map转换:
data.map(new mymapfunction());
富函数也可以定义为匿名类:
data.map (new richmapfunction<string, integer>() { public integer map(string value) { return integer.parseint(value); } });
除了用户定义的函数(map,reduce等)之外,rich函数还提供了四种方法:open,close,getruntimecontext和setruntimecontext。 这些用于参数化函数,创建和完成本地状态,访问广播变量以及访问运行时信息(如累加器和计数器)以及迭代信息。
supported data types 支持的数据类型
flink对dataset或datastream中可以包含的元素类型设置了一些限制。原因是系统分析类型以确定有效的执行策略。
有六种不同类别的数据类型:
- 元组(java tuples and scala case classes)
- java普通对象(java pojos)
- 基本类型(primitive types)
- 常规类(regular classes)
- 值类型(values)
- hadoop可写接口的实现(hadoop writables)
- 特殊类型(special types)
tuples and case classes 元组
元组是包含固定数量的具有各种类型的字段的复合类型。 java api提供从tuple1到tuple25的类。 元组的每个字段都可以是包含更多元组的任意flink类型,从而产生嵌套元组。 可以使用字段名称tuple.f4直接访问元组的字段,也可以使用通用getter方法tuple.getfield(int position)。 字段索引从0开始。请注意,这与scala元组形成对比,但它与java的一般索引更为一致。
datastream<tuple2<string, integer>> wordcounts = env.fromelements( new tuple2<string, integer>("hello", 1), new tuple2<string, integer>("world", 2)); wordcounts.map(new mapfunction<tuple2<string, integer>, integer>() { @override public integer map(tuple2<string, integer> value) throws exception { return value.f1; } }); wordcounts.keyby(0); // also valid .keyby("f0")
pojo java普通对象
如果满足以下要求,则flink将java和scala类视为特殊的pojo数据类型:
- 类必须是公共的。
- 它必须有一个没有参数的公共构造函数(默认构造函数)。
- 所有字段都是公共的,或者必须通过getter和setter函数访问。 对于名为foo的字段,getter和setter方法必须命名为getfoo()和setfoo()。
- flink必须支持字段的类型。 目前,flink使用avro序列化任意对象(例如date)。
flink分析pojo类型的结构,即它了解pojo的字段。 因此,pojo类型比一般类型更容易使用。 此外,flink可以比一般类型更有效地处理pojo。
以下示例显示了一个包含两个公共字段的简单pojo。
public class wordwithcount { public string word; public int count; public wordwithcount() {} public wordwithcount(string word, int count) { this.word = word; this.count = count; } } datastream<wordwithcount> wordcounts = env.fromelements( new wordwithcount("hello", 1), new wordwithcount("world", 2)); wordcounts.keyby("word"); // key by field expression "word"
primitive types 基本类型
flink支持所有java和scala基本类型,如integer,string和double。
general class types 常规类类型
flink支持大多数java和scala类(api和自定义)。 限制适用于包含无法序列化的字段的类,如文件指针,i/o流或其他本机资源。 遵循java beans约定的类通常可以很好地工作。
所有未标识为pojo类型的类(请参阅上面的pojo要求)都由flink作为常规类类型处理。 flink将这些数据类型视为黑盒子,并且无法访问其内容(即,用于高效排序)。 使用序列化框架kryo对常规类型进行序列化和反序列化。
values 值类型
值类型需手动描述其序列化和反序列化。它们不是通过通用序列化框架,而是通过使用读取和写入方法实现org.apache.flinktypes.value接口来为这些操作提供自定义代码。当通用序列化效率非常低时,使用值类型是合理的。一个示例是将元素的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零元素使用特殊编码,而通用序列化只需编写所有数组元素。
org.apache.flinktypes.copyablevalue接口以类似的方式支持手动内部克隆逻辑。
flink带有与基本数据类型对应的预定义值类型。 (bytevalue,shortvalue,intvalue,longvalue,floatvalue,doublevalue,stringvalue,charvalue,booleanvalue)。这些值类型充当基本数据类型的可变变体:它们的值可以被更改,允许程序员重用对象并降低垃圾收集器压力。
hadoop writables hadoop可写接口的实现
你可以使用实现org.apache.hadoop.writable接口的类型。 write()和readfields()方法中定义的序列化逻辑将用于序列化。
special types 特殊类型
你可以使用特殊类型,包括scala的either,option和try。 java api有自己的自定义either实现。 与scala的either类似,它代表两种可能类型的值,左或右。 两者都可用于错误处理或需要输出两种不同类型记录的运算符。
type erasure & type inference 类型擦除和类型推断
注意:本节仅适用于java。
java编译器在编译后抛弃了大部分泛型类型信息。这在java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。例如,datastream <string>和datastream <long>的实例在jvm看来是一样的。
flink在准备执行程序时(当调用程序的main方法时)需要类型信息。 flink java api尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和运算符中。你可以通过datastream.gettype()检索类型。该方法返回typeinformation的一个实例,这是flink表示类型的内部方式。
类型推断有其局限性,在某些情况下需要程序员的“合作”。这方面的示例是从集合创建数据集的方法,例如executionenvironment.fromcollection(),你可以在其中传递描述类型的参数。但是像mapfunction<i,o>这样的通用函数也可能需要额外的类型信息。
resulttypequeryable接口可以通过输入格式和函数实现,以明确告知api其返回类型。调用函数的输入类型通常可以通过先前操作的结果类型来推断。
accumulators & counters 累加器和计数器
累加器是具有增加操作(add operation)和最终累积结果(final accumulated result)的简单构造,可在作业结束后使用。
最直接的累加器是一个计数器(counter):你可以使用accumulator.add(v value)方法递增它。 在工作结束时,flink将汇总(合并)所有部分结果并将结果发送给客户。 在调试过程中,或者如果你想快速了解有关数据的更多信息,累加器非常有用。
flink目前有以下内置累加器。 它们中的每一个都实现了accumulator接口。
- intcounter,longcounter和doublecounter:请参阅下面的使用计数器的示例。
- 直方图(histogram):离散数量的区间的直方图实现。 在内部,它只是一个从integer到integer的映射。 你可以使用它来计算值的分布,例如 字数统计程序的每行字数分布。
how to use accumulators: 如何使用累加器:
首先,你必须在要使用它的用户定义转换函数中创建累加器对象(此处为计数器)。
private intcounter numlines = new intcounter();
其次,你必须注册累加器对象,通常在富函数的open()方法中。 在这里你还可以定义名称。
getruntimecontext().addaccumulator("num-lines", this.numlines);
你现在可以在运算符函数中的任何位置使用累加器,包括open()和close()方法。
this.numlines.add(1);
整个结果将存储在jobexecutionresult对象中,该对象是从执行环境的execute()方法返回的(当前这仅在执行等待作业完成时才有效)。
myjobexecutionresult.getaccumulatorresult("num-lines")
每个作业的所有累加器共享一个命名空间。 因此,你可以在作业的不同运算符函数中使用相同的累加器。 flink将在内部合并所有具有相同名称的累加器。
关于累加器和迭代的说明:目前累加器的结果仅在整个作业结束后才可用。 我们还计划在下一次迭代中使前一次迭代的结果可用。 你可以使用聚合器来计算每次迭代统计信息,并根据此类统计信息确定迭代的终止。
custom accumulators: 定制累加器:
要实现自己的累加器,只需编写accumulator接口的实现即可。 如果你认为你的自定义累加器应该合入flink主干,请随意创建拉取请求(pull request)。
你可以选择实现accumulator或simpleaccumulator。
accumulator<v, r>最灵活:它为要添加的值定义类型v,为最终结果定义结果类型r. 例如。 对于直方图,v是数字,r是直方图。 simpleaccumulator适用于两种类型相同的情况,例如: 对于计数器。
上一篇: 【db】mongodb的故事
下一篇: laravel批量数据填充