欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink零基础实战教程:股票价格数据流实时处理

程序员文章站 2022-07-13 14:56:43
...

之前的文章《10行Flink WordCount程序背后的万字深度解析,读懂Flink原理和架构》使用WordCount展示了Flink程序的基本结构,本文将以股票价格来演示如何使用Flink的DataStream API。通过本文,你可以学到:

  1. 定义相关数据结构。
  2. Flink流处理程序的骨架。
  3. Flink的执行环境概念。
  4. 自定义Source、设置时间戳和Watermark。

数据结构

Flink能处理任何可被序列化的数据结构:

  • 基础数据类型,包括 String、Integer、Boolean、Array
  • 复杂数据结构,包括 Scala case class和 Java POJO

此外,Flink也支持Kryo序列化工具。

本例使用Scala case class来定义一个股票类,该对象包括三个字段:股票代号、时间戳和价格。真实的股票交易数据比这个更为复杂,这里只是一个简化的模型。

case class StockPrice(symbol: String, timestamp: Long, price: Double)

当然,如果使用Java,也可以定义一个POJO(Plain Old Java Object),该类中各个字段或者具有public属性,或者有一个对应的getter和setter方法,且该类有一个无参数的构造函数。

public class StockPrice {
  public String symbol;
  public Long timestamp;
  public Double price;
  
  public StockPrice() {};
  public StockPrice(String symbol, Long timestamp, Double price){
    ...
  };
}

相比而言,Scala的类定义更为简洁,因为Scala的编译器在编译阶段帮忙生成了不少代码,Java的代码风格有些臃肿。

Flink对数据类型有以上要求,主要因为在分布式计算过程中,需要将内存中的对象序列化成可多节点传输的数据,并且能够在对应节点被反序列化成对象。

Flink流处理程序的骨架结构

基于上面的数据结构,我们开始开发程序。下面的代码清单使用Flink对股票数据流分析程序,该程序能够统计数据源中每支股票5秒时间窗口内的最大值。

object StockPriceDemo {

  def main(args: Array[String]) {

    // 设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 每5秒生成一个Watermark
    env.getConfig.setAutoWatermarkInterval(5000L)

    // 股票价格数据流
    val stockPriceRawStream: DataStream[StockPrice] = env
      // 该数据流由StockPriceSource类随机生成
      .addSource(new StockPriceSource)
      // 设置 Timestamp 和 Watermark
      .assignTimestampsAndWatermarks(new StockPriceTimeAssigner)

    val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
      .keyBy(_.symbol)
      // 设置5秒的时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      // 取5秒内某一只股票的最大值
      .max("price")

    // 打印结果
    stockPriceStream.print()

    // 执行程序
    env.execute("Compute max stock price")
  }
}

Java或Scala的程序入口一般是一个静态(static)的main函数。而在Scala中,object下的变量和方法都是静态的。在main函数中,还需要定义下面几个步骤:

  1. 设置运行环境。

  2. 读取一到多个数据源。

  3. 根据业务逻辑对数据流进行Transformation操作。

  4. 将结果输出。

  5. 调用作业执行函数 execute。

接下来我们对这五个步骤拆解分析。

设置执行环境

val env = StreamExecutionEnvironment.getExecutionEnvironment

这行代码可以获取一个Flink流处理执行环境。Flink一般运行在一个集群上,执行环境是Flink程序运行的上下文,它提供了一系列作业与集群交互的方法,比如作业如何与外部世界交互。当调用getExecutionEnvironment方法时,假如我们是在一个集群上提交作业,则返回集群的上下文,假如我们是在本地执行,则返回本地的上下文。本例中我们是进行流处理,在批处理场景则要获取DataSet API中批处理执行环境。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)这行代码告知执行环境使用Event-time时间语义来进行后续时间上的计算。Event-time语义下需要依赖Watermark机制,即收到一个Watermark后,开始对这个窗口进行计算,比Watermark更晚到达的事件都被视为延迟数据。env.getConfig.setAutoWatermarkInterval(5000L)设置每5秒生成一个Watermark,默认情况下每200毫秒生成一个Watermark。

此外,我们还可以设置作业的并行度、配置Checkpoint等操作。可见,执行环境是我们与Flink交互的入口。

读取数据源

接着我们需要使用执行环境提供的方法读取数据源,读取数据源的部分统称为Source。数据源一般是消息队列或文件,我们也可以根据业务需求重写数据源,比如定时爬取网络中某处的数据。在本例中,我们使用val stockPriceRawStream: DataStream[StockPrice] = env.addSource(new StockPriceSource)来读取数据源,其中StockPriceSource随机生成了一些股票价格数据。最终生成的stockPriceRawStream是一个由StockPrice组成的DataStream数据流。

下面的代码清单展示了StockPriceSource类继承RichSourceFunction,对run方法重写,不断随机生成股票价格,生成的数据最终写入SourceContext中。

class StockPriceSource extends RichSourceFunction[StockPrice]{

  var isRunning: Boolean = true

  val rand = new Random()
  // 初始化股票价格
  var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d)
  var stockId = 0
  var curPrice = 0.0d

  override def run(srcCtx: SourceContext[StockPrice]): Unit = {

    while (isRunning) {
      // 每次从列表中随机选择一只股票
      stockId = rand.nextInt(priceList.size)

      val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05
      priceList = priceList.updated(stockId, curPrice)
      val curTime = Calendar.getInstance.getTimeInMillis

      // 将数据源收集写入SourceContext
      srcCtx.collect(StockPrice("symbol_" + stockId.toString, curTime, curPrice))
      Thread.sleep(rand.nextInt(10))
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

尽管StockPrice的数据结构中有时间戳的字段,但是Flink并不知道哪个字段是时间戳,所以还需要手动设置。assignTimestampsAndWatermarks(new StockPriceTimeAssigner)方法允许我们设置时间戳和Watermark,这样Flink就可以知道本程序的时间戳。Flink Watermark相关的内容将在后续文章中介绍。

下面的代码清单抽取数据源中StockPricetimestamp字段作为该事件的时间戳。

class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) {
    override def extractTimestamp(t: StockPrice): Long = t.timestamp
}

Transformation

此时,我们已经获取了一个股票价格数据流,接下来我们就可以在数据流上进行有状态的计算了。我们一般使用Flink提供的各类算子,使用链式调用的方式,对一个数据流进行操作。经过各Transformation算子的处理,DataStream可能被转换为KeyedStreamJoinedStream等不同的数据流结构。相比Spark RDD的数据结构,Flink的数据流结构确实更加复杂。

本例中,我们按照股票代号对数据进行分组,并开启一个5秒的时间窗口,统计该窗口下某支股票的5秒内的最大值。

val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
      .keyBy(_.symbol)
      // 设置5秒的时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      // 取5秒内某一只股票的最大值
      .max("price")

keyBy算子将数据流按照股票的symbol分组,相同symbol的股票数据会被归结到一起;window算子开启了一个5秒的滚动窗口;max算子统计这个5秒窗口内的最大值。最终我们能够得到每支股票5秒内的最大值。

输出结果

然后我们需要将前面的计算结果输出到外部系统,可能是一个消息队列、文件系统或数据库,也可以自定义输出方式,输出结果的部分统称为Sink。

本例中,5秒窗口内每支股票的最大值是计算结果,是一个DataStream[StockPrice]结构的数据流。我们调用print函数将这个数据流打印到标准输出(standard output)。

执行

当定义好程序的Source、Transformation和Sink的业务逻辑后,程序并不会立即执行这些算子对应的任何计算,还需要调用执行环境execute()方法来执行前面的业务逻辑。Flink是延迟执行(lazy evaluation)的,即当程序明确调用execute()方法时,Flink会将数据流图转化为一个JobGraph,提交给JobManager,JobManager根据当前的执行环境来执行这个作业。

总结

一个Flink程序的核心业务逻辑主要包括:Source、Transformation和Sink三部分。程序的开始前要设置执行环境,最后要调用execute()方法。
Flink零基础实战教程:股票价格数据流实时处理
整个程序的完整代码如下所示,完整程序和更多案例参见我的GitHub:https://github.com/luweizheng/flink-tutorials。

package com.flink.tutorials.demos.stock

import java.util.Calendar

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}

import scala.util.Random

object StockPriceDemo {

  /**
    * Case Class StockPrice
    * symbol 股票代号
    * timestamp 时间戳
    * price 价格
    */
  case class StockPrice(symbol: String, timestamp: Long, price: Double)

  def main(args: Array[String]) {

    // 设置执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 每5秒生成一个Watermark
    env.getConfig.setAutoWatermarkInterval(5000L)

    // 股票价格数据流
    val stockPriceRawStream: DataStream[StockPrice] = env
      // 该数据流由StockPriceSource类随机生成
      .addSource(new StockPriceSource)
      // 设置 Timestamp 和 Watermark
      .assignTimestampsAndWatermarks(new StockPriceTimeAssigner)

    val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream
      .keyBy(_.symbol)
      // 设置5秒的时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      // 取5秒内某一只股票的最大值
      .max("price")

    // 打印结果
    stockPriceStream.print()

    // 执行程序
    env.execute("Compute max stock price")
  }

  class StockPriceSource extends RichSourceFunction[StockPrice]{

    var isRunning: Boolean = true

    val rand = new Random()
    // 初始化股票价格
    var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d)
    var stockId = 0
    var curPrice = 0.0d

    override def run(srcCtx: SourceContext[StockPrice]): Unit = {

      while (isRunning) {
        // 每次从列表中随机选择一只股票
        stockId = rand.nextInt(priceList.size)

        val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05
        priceList = priceList.updated(stockId, curPrice)
        val curTime = Calendar.getInstance.getTimeInMillis

        // 将数据源收集写入SourceContext
        srcCtx.collect(StockPrice("symbol_" + stockId.toString, curTime, curPrice))
        Thread.sleep(rand.nextInt(10))
      }
    }

    override def cancel(): Unit = {
      isRunning = false
    }
  }

  class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) {
    override def extractTimestamp(t: StockPrice): Long = t.timestamp
  }

}

Flink零基础实战教程:股票价格数据流实时处理