DataStream API编程指南之Data Sources(七)
DataStream API编程指南之Data Sources(七)
Sources 是程序读取输入的地方。通过使用StreamExecutionEnvironment.addSource(sourceFunction)
,可以添加一个数据源到你的程序中。Flink附带了许多预先实现的源函数,但是,我们也可以通过实现SourceFunction类添加自定义的不可并行化源,或者通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction类的方式实现可并行化流。
可以从StreamExecutionEnvironment访问几个预定义的流源:
1、基于文件方式
-
readTextFile(path)
- 读取文本文件,。例如,符合TextInputFormat
规范的文件,可以逐行读取并以字符串形式返回。 -
readFile(fileInputFormat, path)
- 按照指定的文件输入格式读取(一次)文件。 -
readFile(fileInputFormat, path, watchType, interval, pathFilter)
- 前面的两个方法内部都是调用该方法。它根据给定的fileInputFormat读取路径中的文件。根据所提供的watchType,此源可以周期性地(固定间隔ms)监控给定路径下的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者在当前路径下只处理一次数据并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步排除正在处理的文件。
在底层,Flink将文件读取过程分解为两个子任务,即directory monitoring 和data reading,每一个子任务都由单独的实体实现。directory monitoring由单个、非并行(并行性= 1)任务实现,而data reading由多个并行运行的任务执行。data reading的并行度等于作业并行度。单个monitoring任务的作用是扫描目录(根据watchType定期扫描或仅扫描一次),查找要处理的文件,将它们切分成几个split,并将这些split分配给下游的reader。reader是实际读取数据的。每个split只能由一个reader读取,而一个reader可以逐个读取多个split。
重要提示:
- 如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,文件的全部内容将被重新处理。这可能会破坏“只执行一次”的语义,因为将数据添加到文件末尾的行为,将导致对文件的全部内容进行重新处理。
- 如果watchType设置为FileProcessingMode.PROCESS_ONCE,source扫描路径一次并退出,而不会等待reader完成文件内容的读取。当然,reader会继续读取,直至读完文件的所有内容。关闭源将导致在此之后不再有检查点。这可能导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
2、基于Socket方式
socketTextStream
- 从socket读取数据。 元素可以用分隔符进行分隔。
例子:读取socket的数据,利用netcat工具从指定端口的socket读取数据并输出。
(1)首先,在命令行窗口执行如下命令
nc -l -p 9999 # win10
nc -lk 9999 # linux
(2)读取上述socket的数据,并打印
def socketSource(env: StreamExecutionEnvironment) = {
val data: DataStream[String] = env
.socketTextStream("localhost", 9999)
data.print()
env.execute("Socket Source...")
}
3、基于集合方式
-
fromCollection(Seq)
- 从Java.util.Collection创建数据流。集合中的所有元素都必须是相同的类型。 -
fromCollection(Iterator)
- 从迭代器创建数据流。 class指定迭代器返回元素的数据类型。 -
fromElements(elements: _*)
- 根据给定的对象序列创建数据流。所有对象必须具有相同的类型。 -
fromParallelCollection(SplittableIterator)
-从迭代器并行地创建数据流。class指定迭代器返回的元素的数据类型。 -
generateSequence(from, to)
- 在给定的区间内并行地生成数字序列。
4、自定义方式
addSource
- 添加一个新的源函数。例如,从kafka读取数据,可以使用addSource(new FlinkKafkaConsumer08<>(…)). 更多的细节,可以参考connectors。
- 非并行化源,实现SourceFunction接口
- 并行化源,实现ParallelSourceFunction接口或者继承RichParallelSourceFunction类
(1)自定义非并行化源
/**
* 自定义的非并行化源
*/
class CustomNonParallelSourceFunction extends SourceFunction[Long]{
var count = 1L
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning && count < 1000){
ctx.collect(count)
count = count + 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
使用:
def nonParallelSourceFunction(env:StreamExecutionEnvironment) = {
val data: DataStream[Long] = env
.addSource(new CustomNonParallelSourceFunction)
data.print()
env.execute("DataStreamSourceApp")
}
(2)自定义可并行化源-实现ParallelSourceFunction接口方式
/**
* 自定义可并行化源
*/
class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{
var isRunning = true
var count = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning && count < 100){
ctx.collect(count)
count = count + 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = isRunning = false
}
使用:
def parallelSourceFunction(env:StreamExecutionEnvironment) = {
val data: DataStream[Long] = env
.addSource(new CustomParallelSourceFunction).setParallelism(3)
data.print()
env.execute("DataStreamSourceApp")
}
(3)自定义可并行化源-继承RichParallelSourceFunction类方式
class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long]{
var isRunning = true
var count = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
while (isRunning){
ctx.collect(count)
count = count + 1
Thread.sleep(1000)
}
}
override def cancel(): Unit = isRunning = false
}
使用:
def richParallelSourceFunction(env:StreamExecutionEnvironment) = {
val data: DataStream[Long] = env
.addSource(new CustomRichParallelSourceFunction).setParallelism(3)
data.print()
env.execute("DataStreamSourceApp")
}