Flink零基础实战教程:股票价格数据流实时处理
之前的文章《10行Flink WordCount程序背后的万字深度解析,读懂Flink原理和架构》使用WordCount展示了Flink程序的基本结构,本文将以股票价格来演示如何使用Flink的DataStream API。通过本文,你可以学到:
- 定义相关数据结构。
- Flink流处理程序的骨架。
- Flink的执行环境概念。
- 自定义Source、设置时间戳和Watermark。
数据结构
Flink能处理任何可被序列化的数据结构:
- 基础数据类型,包括 String、Integer、Boolean、Array
- 复杂数据结构,包括 Scala case class和 Java POJO
此外,Flink也支持Kryo序列化工具。
本例使用Scala case class来定义一个股票类,该对象包括三个字段:股票代号、时间戳和价格。真实的股票交易数据比这个更为复杂,这里只是一个简化的模型。
case class StockPrice(symbol: String, timestamp: Long, price: Double)
当然,如果使用Java,也可以定义一个POJO(Plain Old Java Object),该类中各个字段或者具有public属性,或者有一个对应的getter和setter方法,且该类有一个无参数的构造函数。
public class StockPrice {
public String symbol;
public Long timestamp;
public Double price;
public StockPrice() {};
public StockPrice(String symbol, Long timestamp, Double price){
...
};
}
相比而言,Scala的类定义更为简洁,因为Scala的编译器在编译阶段帮忙生成了不少代码,Java的代码风格有些臃肿。
Flink对数据类型有以上要求,主要因为在分布式计算过程中,需要将内存中的对象序列化成可多节点传输的数据,并且能够在对应节点被反序列化成对象。
Flink流处理程序的骨架结构
基于上面的数据结构,我们开始开发程序。下面的代码清单使用Flink对股票数据流分析程序,该程序能够统计数据源中每支股票5秒时间窗口内的最大值。
object StockPriceDemo {
def main(args: Array[String]) {
// 设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每5秒生成一个Watermark
env.getConfig.setAutoWatermarkInterval(5000L)
// 股票价格数据流
val stockPriceRawStream: DataStream[StockPrice] = env
// 该数据流由StockPriceSource类随机生成
.addSource(new StockPriceSource)
// 设置 Timestamp 和 Watermark
.assignTimestampsAndWatermarks(new StockPriceTimeAssigner)
val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
.keyBy(_.symbol)
// 设置5秒的时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 取5秒内某一只股票的最大值
.max("price")
// 打印结果
stockPriceStream.print()
// 执行程序
env.execute("Compute max stock price")
}
}
Java或Scala的程序入口一般是一个静态(static)的main函数。而在Scala中,object下的变量和方法都是静态的。在main函数中,还需要定义下面几个步骤:
-
设置运行环境。
-
读取一到多个数据源。
-
根据业务逻辑对数据流进行Transformation操作。
-
将结果输出。
-
调用作业执行函数 execute。
接下来我们对这五个步骤拆解分析。
设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
这行代码可以获取一个Flink流处理执行环境。Flink一般运行在一个集群上,执行环境是Flink程序运行的上下文,它提供了一系列作业与集群交互的方法,比如作业如何与外部世界交互。当调用getExecutionEnvironment方法时,假如我们是在一个集群上提交作业,则返回集群的上下文,假如我们是在本地执行,则返回本地的上下文。本例中我们是进行流处理,在批处理场景则要获取DataSet API中批处理执行环境。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
这行代码告知执行环境使用Event-time时间语义来进行后续时间上的计算。Event-time语义下需要依赖Watermark机制,即收到一个Watermark后,开始对这个窗口进行计算,比Watermark更晚到达的事件都被视为延迟数据。env.getConfig.setAutoWatermarkInterval(5000L)
设置每5秒生成一个Watermark,默认情况下每200毫秒生成一个Watermark。
此外,我们还可以设置作业的并行度、配置Checkpoint等操作。可见,执行环境是我们与Flink交互的入口。
读取数据源
接着我们需要使用执行环境提供的方法读取数据源,读取数据源的部分统称为Source。数据源一般是消息队列或文件,我们也可以根据业务需求重写数据源,比如定时爬取网络中某处的数据。在本例中,我们使用val stockPriceRawStream: DataStream[StockPrice] = env.addSource(new StockPriceSource)
来读取数据源,其中StockPriceSource
随机生成了一些股票价格数据。最终生成的stockPriceRawStream
是一个由StockPrice
组成的DataStream数据流。
下面的代码清单展示了StockPriceSource
类继承RichSourceFunction
,对run
方法重写,不断随机生成股票价格,生成的数据最终写入SourceContext
中。
class StockPriceSource extends RichSourceFunction[StockPrice]{
var isRunning: Boolean = true
val rand = new Random()
// 初始化股票价格
var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d)
var stockId = 0
var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {
while (isRunning) {
// 每次从列表中随机选择一只股票
stockId = rand.nextInt(priceList.size)
val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05
priceList = priceList.updated(stockId, curPrice)
val curTime = Calendar.getInstance.getTimeInMillis
// 将数据源收集写入SourceContext
srcCtx.collect(StockPrice("symbol_" + stockId.toString, curTime, curPrice))
Thread.sleep(rand.nextInt(10))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
尽管StockPrice
的数据结构中有时间戳的字段,但是Flink并不知道哪个字段是时间戳,所以还需要手动设置。assignTimestampsAndWatermarks(new StockPriceTimeAssigner)
方法允许我们设置时间戳和Watermark,这样Flink就可以知道本程序的时间戳。Flink Watermark相关的内容将在后续文章中介绍。
下面的代码清单抽取数据源中StockPrice
的timestamp
字段作为该事件的时间戳。
class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) {
override def extractTimestamp(t: StockPrice): Long = t.timestamp
}
Transformation
此时,我们已经获取了一个股票价格数据流,接下来我们就可以在数据流上进行有状态的计算了。我们一般使用Flink提供的各类算子,使用链式调用的方式,对一个数据流进行操作。经过各Transformation算子的处理,DataStream
可能被转换为KeyedStream
、JoinedStream
等不同的数据流结构。相比Spark RDD的数据结构,Flink的数据流结构确实更加复杂。
本例中,我们按照股票代号对数据进行分组,并开启一个5秒的时间窗口,统计该窗口下某支股票的5秒内的最大值。
val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
.keyBy(_.symbol)
// 设置5秒的时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 取5秒内某一只股票的最大值
.max("price")
keyBy
算子将数据流按照股票的symbol分组,相同symbol的股票数据会被归结到一起;window
算子开启了一个5秒的滚动窗口;max
算子统计这个5秒窗口内的最大值。最终我们能够得到每支股票5秒内的最大值。
输出结果
然后我们需要将前面的计算结果输出到外部系统,可能是一个消息队列、文件系统或数据库,也可以自定义输出方式,输出结果的部分统称为Sink。
本例中,5秒窗口内每支股票的最大值是计算结果,是一个DataStream[StockPrice]
结构的数据流。我们调用print
函数将这个数据流打印到标准输出(standard output)。
执行
当定义好程序的Source、Transformation和Sink的业务逻辑后,程序并不会立即执行这些算子对应的任何计算,还需要调用执行环境execute()
方法来执行前面的业务逻辑。Flink是延迟执行(lazy evaluation)的,即当程序明确调用execute()
方法时,Flink会将数据流图转化为一个JobGraph,提交给JobManager,JobManager根据当前的执行环境来执行这个作业。
总结
一个Flink程序的核心业务逻辑主要包括:Source、Transformation和Sink三部分。程序的开始前要设置执行环境,最后要调用execute()
方法。
整个程序的完整代码如下所示,完整程序和更多案例参见我的GitHub:https://github.com/luweizheng/flink-tutorials。
package com.flink.tutorials.demos.stock
import java.util.Calendar
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import scala.util.Random
object StockPriceDemo {
/**
* Case Class StockPrice
* symbol 股票代号
* timestamp 时间戳
* price 价格
*/
case class StockPrice(symbol: String, timestamp: Long, price: Double)
def main(args: Array[String]) {
// 设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每5秒生成一个Watermark
env.getConfig.setAutoWatermarkInterval(5000L)
// 股票价格数据流
val stockPriceRawStream: DataStream[StockPrice] = env
// 该数据流由StockPriceSource类随机生成
.addSource(new StockPriceSource)
// 设置 Timestamp 和 Watermark
.assignTimestampsAndWatermarks(new StockPriceTimeAssigner)
val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
.keyBy(_.symbol)
// 设置5秒的时间窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 取5秒内某一只股票的最大值
.max("price")
// 打印结果
stockPriceStream.print()
// 执行程序
env.execute("Compute max stock price")
}
class StockPriceSource extends RichSourceFunction[StockPrice]{
var isRunning: Boolean = true
val rand = new Random()
// 初始化股票价格
var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d)
var stockId = 0
var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {
while (isRunning) {
// 每次从列表中随机选择一只股票
stockId = rand.nextInt(priceList.size)
val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05
priceList = priceList.updated(stockId, curPrice)
val curTime = Calendar.getInstance.getTimeInMillis
// 将数据源收集写入SourceContext
srcCtx.collect(StockPrice("symbol_" + stockId.toString, curTime, curPrice))
Thread.sleep(rand.nextInt(10))
}
}
override def cancel(): Unit = {
isRunning = false
}
}
class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) {
override def extractTimestamp(t: StockPrice): Long = t.timestamp
}
}
上一篇: Tensorflow 基础
下一篇: 实时通信 socketio nio 总结