Flink1.4HDFSConnector
此连接器提供一个Sink,将分区文件写入
Hadoop FileSystem支持的任何文件系统。要使用此连接器,添加以下依赖项:
org.apache.flink flink-connector-filesystem_2.10 1.4-SNAPSHOT |
文件分桶的Sink(Bucketing File Sink)
文件分桶的Sink(Bucketing File Sink)
分桶(Bucketing)行为以及写入数据操作都可以配置,我们稍后会讲到。下面展示了如何通过默认配置创建分桶的Sink,输出到按时间切分的滚动文件中:
Java版本:
DataStream input = ...; input.addSink(new BucketingSink("/base/path")); |
Scala版本:
val input: DataStream[String] = ... input.addSink(new BucketingSink[String]("/base/path")) |
这里唯一必需的参数是这些分桶文件存储的基本路径
/base/path。可以通过指定自定义
bucketer,
writer和
batch大小来进一步配置
sink。
默认情况下,分桶 sink 根据元素到达时当前系统时间来进行切分,并使用
yyyy-MM-dd--HH时间格式来命名这些分桶。这个时间格式传递给当前的系统时间的
SimpleDateFormat来命名桶的路径。每当遇到一个新的时间就会创建一个新的桶。例如,如果你有一个包含分钟的最细粒度时间格式,那么你将会每分钟获得一个新桶。每个桶本身就是一个包含 part 文件的目录:Sink的每个并行实例都将创建自己的 part 文件,当 part 文件变得太大时,会紧挨着其他文件创建一个新的 part 文件。当一个桶在最近没有被写入数据时被视为非活跃的。当桶变得不活跃时,打开的 part 文件将被刷新(flush)并关闭。默认情况下,sink 每分钟都会检查非活跃的桶,并关闭一分钟内没有写入数据的桶。可以在
BucketingSink上使用
setInactiveBucketCheckInterval()和
setInactiveBucketThreshold()配置这些行为。
你还可以使用
BucketingSink上的
setBucketer()指定自定义
bucketer。如果需要,
bucketer可以使用元素或元组的属性来确定
bucket目录。
默认的
writer是
StringWriter。对传入的元素调用
toString(),并将它们写入 part 文件,用换行符分隔。要在
BucketingSink上指定一个自定义的
writer,使用
setWriter()方法即可。如果要写入
Hadoop SequenceFiles文件中,可以使用提供的
SequenceFileWriter,并且可以配置使用压缩格式。
最后一个配置选项是 batch 大小。这指定何时关闭 part 文件,并开启一个新文件。(默认part文件大小为384MB)。
Java版本:
DataStream> input = ...; BucketingSink sink = new BucketingSink("/base/path"); sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")); sink.setWriter(new SequenceFileWriter()); sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB, input.addSink(sink); |
Scala版本:
val input: DataStream[Tuple2[IntWritable, Text]] = ... val sink = new BucketingSink[String]("/base/path") sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm")) sink.setWriter(new SequenceFileWriter[IntWritable, Text]()) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, input.addSink(sink) |
上面例子将创建一个sink,写入遵循下面格式的分桶文件中:
/base/path/{date-time}/part-{parallel-task}-{count} |
其中
date-time是从日期/时间格式获得的字符串,
parallel-task是并行
sink实例的索引,
count是由于
batch大小而创建的part文件的运行编号。
备注:
Sink版本:1.4
上一篇: 实战node静态文件服务器的示例代码
下一篇: 邮件发送简单例子-html文件