Learning Apache Flink(window API)
程序员文章站
2022-07-14 13:31:13
...
在Flink中window是一个很重要的功能,本文只演示一个简单的window用法,后续会结合业务更加深入的研究window的其他特性。
功能描述
kafka中的数据表示imsi,lac,cell
,现在要统计相同lac,cell
中的人数
代码流程
- Flink通过
CsvRowDeserializationSchema
按照特定的schema接kafka中的数据 -
keyBy
指定多字段作为主键 - 实现
AggregateFunction
计算指定key的数量 -
aggregate
第二个参数是一个函数,可以将计算结果和key一起输出
完整代码
KafkaWindowDemo.scala
import java.util.Properties
import com.woople.flink.streaming.connectors.kafka.CsvRowDeserializationSchema
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
object KafkaWindowDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hostA:6667")
properties.setProperty("group.id", "vip")
val s = env.addSource(new FlinkKafkaConsumer010[Row]("foo", new CsvRowDeserializationSchema(Types.ROW_NAMED(Array("imsi","lac","cell"), Types.STRING, Types.STRING, Types.STRING)), properties))
val result = s.keyBy(k =>{
(k.getField(1).toString, k.getField(2).toString)
})
.timeWindow(Time.seconds(20))
.aggregate(
new AggregateFunction[Row, IntCounter, Int]() {
override def add(in: Row, acc: IntCounter): Unit = acc.add(1)
override def createAccumulator(): IntCounter = new IntCounter()
override def getResult(acc: IntCounter): Int = acc.getLocalValue
override def merge(acc: IntCounter, acc1: IntCounter): IntCounter = {
acc.merge(acc1)
acc
}
},
(key:(String,String), window:TimeWindow, counts:Iterable[Int], out:Collector[Row])=>{
val count = counts.iterator.next
val row = new Row(4)
row.setField(0, key._1)
row.setField(1, key._2)
row.setField(2, count)
row.setField(3, window.getEnd)
out.collect(row)
})
result.print()
env.execute("Kafka Window Stream")
}
CsvRowDeserializationSchema.scala请参考Learning Apache Flink(API)
测试用例
向foo中写入数据
40012345,x,y
40011111,x,y
40099999,x,y
得到结果
x,y,3,1512626920000
推荐阅读
-
idea中flink启动报错org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters
-
Apache Flink - Batch(DataSet API)
-
2.3 Apache Flink DataStream API
-
Learning Apache Flink(window API)
-
Learning Apache Flink(join API)
-
Learning Apache Flink(API)
-
idea中flink启动报错org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters
-
【Flink异常】Caused by: org.apache.flink.table.api.ValidationException: Could not find any factories