欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Learning Apache Flink(window API)

程序员文章站 2022-07-14 13:31:13
...

在Flink中window是一个很重要的功能,本文只演示一个简单的window用法,后续会结合业务更加深入的研究window的其他特性。

功能描述

kafka中的数据表示imsi,lac,cell,现在要统计相同lac,cell中的人数

代码流程

  • Flink通过CsvRowDeserializationSchema按照特定的schema接kafka中的数据
  • keyBy指定多字段作为主键
  • 实现AggregateFunction计算指定key的数量
  • aggregate第二个参数是一个函数,可以将计算结果和key一起输出

完整代码

KafkaWindowDemo.scala

import java.util.Properties

import com.woople.flink.streaming.connectors.kafka.CsvRowDeserializationSchema
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._

object KafkaWindowDemo {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hostA:6667")
    properties.setProperty("group.id", "vip")

    val s = env.addSource(new FlinkKafkaConsumer010[Row]("foo", new CsvRowDeserializationSchema(Types.ROW_NAMED(Array("imsi","lac","cell"), Types.STRING, Types.STRING, Types.STRING)), properties))

    val result = s.keyBy(k =>{
      (k.getField(1).toString, k.getField(2).toString)
    })
      .timeWindow(Time.seconds(20))
      .aggregate(
      new AggregateFunction[Row, IntCounter, Int]() {
      override def add(in: Row, acc: IntCounter): Unit = acc.add(1)

      override def createAccumulator(): IntCounter = new IntCounter()

      override def getResult(acc: IntCounter): Int = acc.getLocalValue

      override def merge(acc: IntCounter, acc1: IntCounter): IntCounter = {
        acc.merge(acc1)
        acc
      }
    },
      (key:(String,String), window:TimeWindow, counts:Iterable[Int], out:Collector[Row])=>{

      val count = counts.iterator.next
      val row = new Row(4)

      row.setField(0, key._1)
      row.setField(1, key._2)
      row.setField(2, count)
      row.setField(3, window.getEnd)

      out.collect(row)
    })

    result.print()
    env.execute("Kafka Window Stream")
  }

CsvRowDeserializationSchema.scala请参考Learning Apache Flink(API)

测试用例

向foo中写入数据

40012345,x,y
40011111,x,y
40099999,x,y

得到结果

x,y,3,1512626920000