欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink简单应用DataStream开发

程序员文章站 2022-06-17 09:22:09
...

DataStream开发

  Flink中的DataStream程序是实现数据流转换(例如,过滤,更新状态,定义窗口,聚合)的常规程序。数据流最初由各种来源(例如,消息队列,套接字流,文件)创建。结果通过接收器返回,例如可以将数据写入文件,或者写入标准输出(例如命令行终端)。Flink程序可以在各种情况下运行,可以独立运行,也可以嵌入其他程序中。执行可以发生在本地JVM或许多机器的集群中。

列子:wordcount

object WordCount {
  def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val textStream: DataStream[String] = env.socketTextStream("localhost" , 9999)
val lowerData: DataStream[String] = textStream.flatMap(line => line.toLowerCase().split("\\W+"))
val nonEmpty_data: DataStream[String] = lowerData.filter(line => line.nonEmpty)
val mapData: DataStream[(String, Int)] = nonEmpty_data.map(line => (line , 1))
//基于指定的key进行数据分组
val keybyData = mapData.keyBy(0)
val sum: DataStream[(String, Int)] = keybyData.sum(1)
sum.print()
env.execute("start streaming window wordCount")

  }
}

 

 

DataStream的Transformation

和dataset一样,dataStream也包括一系列的Transformation操作:

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/index.html

Transformation

Description

Map
DataStream DataStream

Takes one element and produces one element. A map function that doubles the values of the input stream:

dataStream.map{x=>x*2}

FlatMap
DataStream DataStream

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap{str=>str.split(" ")}

Filter
DataStream DataStream

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter{_!=0}

KeyBy
DataStream KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedStream.

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce
KeyedStream DataStream

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. 

A reduce function that creates a stream of partial sums:

keyedStream.reduce{_+_}

</p>

Fold
KeyedStream DataStream

A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. 
 

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

val result:DataStream[String]=
keyedStream.fold("start")((str,i)=>{str+"-"+i})

Aggregations
KeyedStream DataStream

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

Window
KeyedStream WindowedStream

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

WindowAll
DataStream AllWindowedStream

Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

Window Apply
WindowedStream DataStream
AllWindowedStream
DataStream

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }

Window Reduce
WindowedStream DataStream

Applies a functional reduce function to the window and returns the reduced value.

windowedStream.reduce{_+_}

Window Fold
WindowedStream DataStream

Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":

val result:DataStream[String]=windowedStream.fold("start",(str,i)=>{str+"-"+i})

Aggregations on windows
WindowedStream DataStream

Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")

Union
DataStream* DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

dataStream.union(otherStream1, otherStream2, ...)

Window Join
DataStream,DataStream DataStream

Join two data streams on a given key and a common window.

dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply{
...
}

Window CoGroup
DataStream,DataStream DataStream

Cogroups two data streams on a given key and a common window.

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply
{}

Connect
DataStream,DataStream ConnectedStreams

"Connects" two data streams retaining their types, allowing for shared state between the two streams.

someStream:DataStream[Int]=...
otherStream:DataStream[String]=...
val connectedStreams=someStream.connect(otherStream)

CoMap, CoFlatMap
ConnectedStreams DataStream

Similar to map and flatMap on a connected data stream

connectedStreams.map((_:Int)=>true,(_:String)=>false)
connectedStreams.flatMap((_:Int)=>true,(_:String)=>fals)

Split
DataStream SplitStream

Split the stream into two or more streams according to some criterion.

val split=someDataStream.split((num:Int)=>(num%2)
match{  
case 0=>List("even")
case 1=>List("odd")
})

Select
SplitStream DataStream

Select one or more streams from a split stream.

val even=split select "even"
val odd=split select "odd"
val all=split.select("even","odd")

Iterate
DataStream IterativeStream DataStream

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.

 initialStream.iterate{iteration=>{
valiterationBody=iteration.map{/*do something*/}
(iterationBody.filter(_>0),
iterationBody.filter(_<=0)
}
}

Extract Timestamps
DataStream DataStream

Extracts timestamps from records in order to work with windows that use event time semantics. See Event Time.

stream.assignTimestamps { timestampExtractor }

 

KeyBy

逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散列分区来实现的

object Keyby {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(3)

    val textStream: DataStream[String] = env.socketTextStream("localhost" , 12345)

    val flatMap_data: DataStream[String] = textStream.flatMap(line => line.split("\t"))

    val map_data: DataStream[(String, Int)] = flatMap_data.map(line => (line , 1))

    //TODO 逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散列分区来实现的

    val keyByData: KeyedStream[(String, Int), String] = map_data.keyBy(line => line._1)

    keyByData.writeAsText("keyByData")

    env.execute()

  }

}

 

Windows

在讲解windows的众多操作之前,需要讲解一个概念:

源源不断的数据流是无法进行统计工作的,因为数据流没有边界,就无法统计到底有多少数据经过了这个流。也无法统计数据流中的最大值,最小值,平均值,累加值等信息。

如果在数据流上,截取固定大小的一部分,这部分是可以进行统计的。 截取方式主要有两种,


1.根据时间进行截取(time-driven-window),比如每1分钟统计一次或每10分钟统计一次。

2.根据数据进行截取(data-driven-window),比如每5个数据统计一次或每50个数据统计一次。

 

基于流的操作图:

 

Flink简单应用DataStream开发

关于window的理论+实践

一:tumbling-time-window (无重叠数据)

1.红绿灯路口会有汽车通过,一共会有多少汽车通过,无法计算。因为车流源源不断,计算没有边界。

2.统计每15秒钟通过红路灯的汽车数量,第一个15秒为2辆,第二个15秒为3辆,第三个15秒为1辆。。。

Flink简单应用DataStream开发

1.tumbling-time-window (无重叠数据)实战

1.发送命令

nc -lk 9999


2.发送内容

9,3

9,2

9,7

4,9

2,6

1,5

2,3

5,7

5,4

程序:

object Window {

  def main(args: Array[String]): Unit = {

    //TODO time-window

    //1.创建运行环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment

  

    //2.定义数据流来源

    val text = env.socketTextStream("localhost", 9999)

  

    //3.转换数据格式,text->CarWc

    case class CarWc(sensorId: Int, carCnt: Int)

    val ds1: DataStream[CarWc] = text.map {

      line => {

        val tokens = line.split(",")

        CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)

      }

    }

  

    //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5秒

    //也就是说,每5秒钟统计一次,在这过去的5秒钟内,各个路口通过红绿灯汽车的数量。

    val ds2: DataStream[CarWc] = ds1

      .keyBy("sensorId")

      .timeWindow(Time.seconds(5))

      .sum("carCnt")

  

    //5.显示统计结果

    ds2.print()

  

    //6.触发流计算

    env.execute(this.getClass.getName)

  

  }

}

 

二、sliding-time-window (有重叠数据)

//TODO 2.tumbling-time-window(有重叠)

  //1.创建运行环境

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  

  //2.定义数据流来源

  val text = env.socketTextStream("localhost", 9999)

  

  //3.转换数据格式,text->CarWc

  case class CarWc(sensorId: Int, carCnt: Int)

  val ds1: DataStream[CarWc] = text.map {

  line => {

    val tokens = line.split(",")

    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)

  }

}

  //4.执行统计操作,每个sensorId一个sliding窗口,窗口时间10秒,滑动时间5秒

//也就是说,每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。

  val ds2: DataStream[CarWc] = ds1

  .keyBy("sensorId")

  .timeWindow(Time.seconds(10), Time.seconds(5))

  .sum("carCnt")

  

  //5.显示统计结果

  ds2.print()

  

  //6.触发流计算

  env.execute(this.getClass.getName)

 

三:tumbling-count-window (无重叠数据)

按照个数进行统计,比如:

每个路口分别统计,收到关于它的5条消息时统计在最近5条消息中,各自路口通过的汽车数量


1.发送命令

nc -lk 9999

2.发送内容

9,3

9,2

9,7

4,9

2,6

1,5

2,3

5,7

5,4

程序:

//TODO tumbling-count-window (无重叠数据)

  //1.创建运行环境

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  

  //2.定义数据流来源

  val text = env.socketTextStream("localhost", 9999)

  

  //3.转换数据格式,text->CarWc

  case class CarWc(sensorId: Int, carCnt: Int)

  val ds1: DataStream[CarWc] = text.map {

  (f) => {

    val tokens = f.split(",")

    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)

  }

}

  //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5

//按照key进行收集,对应的key出现的次数达到5次作为一个结果

  val ds2: DataStream[CarWc] = ds1

  .keyBy("sensorId")

  .countWindow(5)

  .sum("carCnt")

  

  //5.显示统计结果

  ds2.print()

  

  //6.触发流计算

  env.execute(this.getClass.getName)

 

四:sliding-count-window (有重叠数据)

同样也是窗口长度和滑动窗口的操作:窗口长度是5,滑动长度是3

//TODO sliding-count-window(有重叠)

  //1.创建运行环境

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  

  //2.定义数据流来源

  val text = env.socketTextStream("localhost", 9999)

  

  //3.转换数据格式,text->CarWc

  case class CarWc(sensorId: Int, carCnt: Int)

  val ds1: DataStream[CarWc] = text.map {

  (f) => {

    val tokens = f.split(",")

    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)

  }

}

  //4.执行统计操作,每个sensorId一个sliding窗口,窗口大小3条数据,窗口滑动为3条数据

//也就是说,每个路口分别统计,收到关于它的3条消息时统计在最近5条消息中,各自路口通过的汽车数量

  val ds2: DataStream[CarWc] = ds1

  .keyBy("sensorId")

  .countWindow(5, 3)

  .sum("carCnt")

  

  //5.显示统计结果

  ds2.print()

  

  //6.触发流计算

  env.execute(this.getClass.getName)

window总结

 

1.flink支持两种划分窗口的方式(time和count)

    如果根据时间划分窗口,那么它就是一个time-window

    如果根据数据划分窗口,那么它就是一个count-window

2.flink支持窗口的两个重要属性(size和interval)

    如果size=interval,那么就会形成tumbling-window(无重叠数据)

    如果size>interval,那么就会形成sliding-window(有重叠数据)

    如果size<interval,那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。

3.通过组合可以得出四种基本窗口

    time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))

    time-sliding-window  有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))

    count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)

    count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)

 

time-window的高级用法

 

1.现实世界中的时间是不一致的,在flink中被划分为事件时间,提取时间,处理时间三种。

2.如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带EventTime

2.如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,sourcesystemTime为准。

2.如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow,以operatorsystemTime为准。

 

EventTime

1.要求消息本身就应该携带EventTime

2.时间对应关系如下

2018-06-02 11:45:55

1527911155000

2018-06-02 11:45:56

1527911156000

2018-06-02 11:45:56

1527911157000

 

需求:

EventTime划分窗口,计算3秒钟内出价最高的产品

数据:

1527911155000,boos1,pc1,100.0
1527911156000,boos2,pc1,200.0
1527911157000,boos1,pc1,300.0
1527911158000,boos2,pc1,500.0
1527911159000,boos1,pc1,600.0
1527911160000,boos1,pc1,700.0
1527911161000,boos2,pc2,700.0
1527911162000,boos2,pc2,900.0
1527911163000,boos2,pc2,1000.0
1527911164000,boos2,pc2,1100.0
1527911165000,boos1,pc2,1100.0
1527911166000,boos2,pc2,1300.0
1527911167000,boos2,pc2,1400.0
1527911168000,boos2,pc2,1600.0
1527911169000,boos1,pc2,1300.0

代码实现:

object EventTimeExample {

  def main(args: Array[String]) {

  

    //1.创建执行环境,并设置为使用EventTime

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //置为使用EventTime

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  

    //2.创建数据流,并进行数据转化

    val source = env.socketTextStream("localhost", 9999)

    case class SalePrice(time: Long, boosName: String, productName: String, price: Double)

    val dst1: DataStream[SalePrice] = source.map(value => {

      val columns = value.split(",")

      SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)

    })

  

    //3.使用EventTime进行求最值操作

    val dst2: DataStream[SalePrice] = dst1

      //提取消息中的时间戳属性

      .assignAscendingTimestamps(_.time)

      .keyBy(_.productName)

      .timeWindow(Time.seconds(3))//设置window方法一

      .max("price")

  

    //4.显示结果

    dst2.print()

  

    //5.触发流计算

    env.execute()

  }

}

当前代码理论上看没有任何问题,在实际使用的时候就会出现很多问题,甚至接收不到数据或者接收到的数据是不准确的;这是因为对于flink最初设计的时候,就考虑到了网络延迟,网络乱序等问题,所以提出了一个抽象概念基座水印(WaterMark);

水印分成两种形式:

第一种:

第二种:

 

所以,我们需要考虑到网络延迟的状况,那么代码中就需要添加水印操作:

上列图讲解:

object EventTimeOperator {

  def main(args: Array[String]): Unit = {

    //创建执行环境,并设置为使用EventTime

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)//注意控制并发数

    //置为使用EventTime

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val source = env.socketTextStream("localhost", 9999)

    val dst1: DataStream[SalePrice] = source.map(value => {

      val columns = value.split(",")

      SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)

    })

    //todo 水印时间  assignTimestampsAndWatermarks

    val timestamps_data = dst1.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SalePrice]{

  

      var currentMaxTimestamp:Long = 0

      val maxOutOfOrderness = 2000L //最大允许的乱序时间是2s

      var wm : Watermark = null

      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

      override def getCurrentWatermark: Watermark = {

        wm = new Watermark(currentMaxTimestamp - maxOutOfOrderness)

        wm

      }

  

      override def extractTimestamp(element: SalePrice, previousElementTimestamp: Long): Long = {

        val timestamp = element.time

        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)

       
      }

    })

    val data: KeyedStream[SalePrice, String] = timestamps_data.keyBy(line => line.productName)

    val window_data: WindowedStream[SalePrice, String, TimeWindow] = data.timeWindow(Time.seconds(3))

    val apply: DataStream[SalePrice] = window_data.apply(new MyWindowFunc)

    apply.print()

    env.execute()

  

  }

}

  case class SalePrice(time: Long, boosName: String, productName: String, price: Double)

  class MyWindowFunc extends WindowFunction[SalePrice , SalePrice , String, TimeWindow]{

  override def apply(key: String, window: TimeWindow, input: Iterable[SalePrice], out: Collector[SalePrice]): Unit = {

    val seq = input.toArray

    val take: Array[SalePrice] = seq.sortBy(line => line.price).reverse.take(1)

    for(info <- take){

      out.collect(info)

    }

  }

}

 

ProcessingTime

对于processTime而言,是flink处理数据的时间,所以就不关心发过来的数据是不是有延迟操作,只关心数据具体的处理时间,所以不需要水印处理,操作相对来说简单了很多

object ProcessingTimeExample {

  def main(args: Array[String]) {

    //创建执行环境,并设置为使用EventTime

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(2)//注意控制并发数

    //置为使用ProcessingTime

    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

  

    val source = env.socketTextStream("localhost", 9999)

    case class SalePrice(time: Long, boosName: String, productName: String, price: Double)

  

    val dst1: DataStream[SalePrice] = source.map(value => {

      val columns = value.split(",")

      SalePrice(columns(0).toLong, columns(1), columns(2), columns(3).toDouble)

    })

    //processTime不需要提取消息中的时间

//    val timestamps_data: DataStream[SalePrice] = dst1.assignAscendingTimestamps(line => line.time)

    val keyby_data: KeyedStream[SalePrice, String] = dst1.keyBy(line => line.productName)

    //TODO 窗口事件是:TumblingProcessingTimeWindows

    val window_data: WindowedStream[SalePrice, String, TimeWindow] = keyby_data.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

    val max_price: DataStream[SalePrice] = window_data.max("price")

    max_price.print()

    env.execute()

  }

}

Window apply

和window的操作类似,只不过操作更加灵活,具体的操作需要在匿名内部类的方法中实现;当有比较复杂的需求时候,可以使用;

object WindowApply {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val textStream: DataStream[String] = env.socketTextStream("localhost" , 9999)

    val flatmapdata: DataStream[String] = textStream.flatMap(x => x.split(","))

    val mapdata: DataStream[(String, Int)] = flatmapdata.map(line => (line,1))

    val keybyStream: KeyedStream[(String, Int), String] = mapdata.keyBy(line => line._1)

    val window: WindowedStream[(String, Int), String, TimeWindow] = keybyStream.timeWindow(Time.of(1 , TimeUnit.SECONDS) ,Time.of(100,TimeUnit.MILLISECONDS))

    val data = window.apply(new WindowFunction[(String, Int) , (String, Int) , String , TimeWindow] {

      override def apply(key: String,

                         window: TimeWindow,

                         input: Iterable[(String, Int)],

                         out: Collector[(String, Int)]): Unit = {

        var output = ""

        var index = 0

        for(in <- input){

            output += "key :" + in._1 + "   value:"+in._2

            index = index + 1

          out.collect(output , index)

          }

      }

    })

    data.print()

    env.execute()

  

  }

}
注意,例子中使用的是window,所以对应的匿名内部类是:WindowFunction

如果使用的是windowAll,则需要使用的内部类是:AllWindowFunction

connect

用来将两个dataStream组装成一个ConnectedStreams

而且这个connectedStream的组成结构就是保留原有的dataStream的结构体;这样我们就可以把不同的数据组装成同一个结构

val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.setParallelism(1)

  val src: DataStream[Int] = env.fromElements(1, 3, 5)

  val stringMap: DataStream[String] = src.map(line => "x "+line)

  val result = stringMap.connect(src).map(new CoMapFunction[String , Int , String] {

  override def map2(value: Int): String = {

    "x "+ (value + 1)

  }

  

  override def map1(value: String): String = {

    value

  }

})

result.print()

env.execute()

 

Split和select

Flink简单应用DataStream开发

Split就是将一个DataStream分成两个或者多个DataStream

Select就是获取分流后对应的数据

val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.setParallelism(1)

  val elements: DataStream[Int] = env.fromElements(1,2,3,4,5,6)

  //数据分流

  val split_data = elements.split(

  (num: Int) => (num % 2) match {

    case 0 => List("even")

    case 1 => List("odd")

  }

)

  //获取分流后的数据

  val select: DataStream[Int] = split_data.select("even")

select.print()

env.execute()

 

 

Flink在流处理上常见的Source和sink操作

flink在流处理上的source和在批处理上的source基本一致。大致有4大类

1.基于本地集合的source(Collection-based-source)

2.基于文件的source(File-based-source)

3.基于网络套接字的source(Socket-based-source)

4.自定义的source(Custom-source)

1:基于集合的source

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}



import scala.collection.immutable.{Queue, Stack}

import scala.collection.mutable

import scala.collection.mutable.{ArrayBuffer, ListBuffer}



object DataSource001 {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment

    //0.用element创建DataStream(fromElements)

    val ds0: DataStream[String] = senv.fromElements("spark", "flink")

    ds0.print()



    //1.用Tuple创建DataStream(fromElements)

    val ds1: DataStream[(Int, String)] = senv.fromElements((1, "spark"), (2, "flink"))

    ds1.print()



    //2.用Array创建DataStream

    val ds2: DataStream[String] = senv.fromCollection(Array("spark", "flink"))

    ds2.print()



    //3.用ArrayBuffer创建DataStream

    val ds3: DataStream[String] = senv.fromCollection(ArrayBuffer("spark", "flink"))

    ds3.print()



    //4.用List创建DataStream

    val ds4: DataStream[String] = senv.fromCollection(List("spark", "flink"))

    ds4.print()



    //5.用List创建DataStream

    val ds5: DataStream[String] = senv.fromCollection(ListBuffer("spark", "flink"))

    ds5.print()



    //6.用Vector创建DataStream

    val ds6: DataStream[String] = senv.fromCollection(Vector("spark", "flink"))

    ds6.print()



    //7.用Queue创建DataStream

    val ds7: DataStream[String] = senv.fromCollection(Queue("spark", "flink"))

    ds7.print()



    //8.用Stack创建DataStream

    val ds8: DataStream[String] = senv.fromCollection(Stack("spark", "flink"))

    ds8.print()



    //9.用Stream创建DataStream(Stream相当于lazy List,避免在中间过程中生成不必要的集合)

    val ds9: DataStream[String] = senv.fromCollection(Stream("spark", "flink"))

    ds9.print()



    //10.用Seq创建DataStream

    val ds10: DataStream[String] = senv.fromCollection(Seq("spark", "flink"))

    ds10.print()



    //11.用Set创建DataStream(不支持)

    //val ds11: DataStream[String] = senv.fromCollection(Set("spark", "flink"))

    //ds11.print()



    //12.用Iterable创建DataStream(不支持)

    //val ds12: DataStream[String] = senv.fromCollection(Iterable("spark", "flink"))

    //ds12.print()



    //13.用ArraySeq创建DataStream

    val ds13: DataStream[String] = senv.fromCollection(mutable.ArraySeq("spark", "flink"))

    ds13.print()



    //14.用ArrayStack创建DataStream

    val ds14: DataStream[String] = senv.fromCollection(mutable.ArrayStack("spark", "flink"))

    ds14.print()



    //15.用Map创建DataStream(不支持)

    //val ds15: DataStream[(Int, String)] = senv.fromCollection(Map(1 -> "spark", 2 -> "flink"))

    //ds15.print()



    //16.用Range创建DataStream

    val ds16: DataStream[Int] = senv.fromCollection(Range(1, 9))

    ds16.print()



    //17.用fromElements创建DataStream

    val ds17: DataStream[Long] = senv.generateSequence(1, 9)

    ds17.print()

   

    senv.execute(this.getClass.getName)

  }

}

2.基于文件的source(File-based-source)

//TODO 2.基于文件的source(File-based-source)

  //0.创建运行环境

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  //TODO 1.读取本地文件

  val text1 = env.readTextFile("data2.csv")

text1.print()

  //TODO 2.读取hdfs文件

  val text2 = env.readTextFile("hdfs://hadoop01:9000/input/flink/README.txt")

text2.print()

env.execute()

3.基于网络套接字的source(Socket-based-source)


val source = env.socketTextStream("IP", PORT)

4.自定义的source(Custom-source,以kafka为例)

Kafka基本命令:


  ● 查看当前服务器中的所有topic

bin/kafka-topics.sh --list --zookeeper  hadoop01:2181

  ● 创建topic

bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic test

  ● 删除topic

sh bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

  ● 通过shell命令发送消息

sh bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test

  ● 通过shell消费消息

bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic test1

  ● 查看消费位置

bin/kafka-run-cla.ss.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup

  ● 查看某个Topic的详情

bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181

  ● 对分区数进行修改

kafka-topics.sh --zookeeper  zk01 --alter --partitions 15 --topic   utopic

 

使用flink消费kafka的消息

import java.util.Properties

  

  import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

  import org.apache.flink.streaming.util.serialization.SimpleStringSchema

  import org.apache.flink.api.scala._

  /**

  * Created by angel;

  */

  object DataSource_kafka {

  def main(args: Array[String]): Unit = {

    //1指定kafka数据流的相关信息

    val zkCluster = "hadoop01,hadoop02,hadoop03:2181"

    val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"

    val kafkaTopicName = "test"

    //2.创建流处理环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment

  

    //3.创建kafka数据流

    val properties = new Properties()

    properties.setProperty("bootstrap.servers", kafkaCluster)

    properties.setProperty("zookeeper.connect", zkCluster)

    properties.setProperty("group.id", kafkaTopicName)

  

    val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName,

      new SimpleStringSchema(), properties)

    //4.添加数据源addSource(kafka09)

    val text = env.addSource(kafka09).setParallelism(4)

  

    /**

      * test#CS#request http://b2c.csair.com/B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=2018-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'2018-07-12','infantnum':'2'}#CS#http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=2018-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#2018-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie

      * */

    val values: DataStream[ProcessedData] = text.map{

      line =>

        var encrypted = line

        val values = encrypted.split("#CS#")

        val valuesLength = values.length

        var regionalRequest =  if(valuesLength > 1) values(1) else ""

        val requestMethod = if (valuesLength > 2) values(2) else ""

        val contentType = if (valuesLength > 3) values(3) else ""

        //Post提交的数据体

        val requestBody = if (valuesLength > 4) values(4) else ""

        //http_referrer

        val httpReferrer = if (valuesLength > 5) values(5) else ""

        //客户端IP

        val remoteAddr = if (valuesLength > 6) values(6) else ""

        //客户端UA

        val httpUserAgent = if (valuesLength > 7) values(7) else ""

        //服务器时间的ISO8610格式

        val timeIso8601 = if (valuesLength > 8) values(8) else ""

        //服务器地址

        val serverAddr = if (valuesLength > 9) values(9) else ""

        //获取原始信息中的cookie字符串

        val cookiesStr = if (valuesLength > 10) values(10) else ""

        ProcessedData(regionalRequest,

          requestMethod,

          contentType,

          requestBody,

          httpReferrer,

          remoteAddr,

          httpUserAgent,

          timeIso8601,

          serverAddr,

          cookiesStr)

    }

    values.print()

    val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)

    remoteAddr.print()
    //5.触发运算

    env.execute("flink-kafka-wordcunt")

  }

}

  

  //保存结构化数据

  case class ProcessedData(regionalRequest: String,

                         requestMethod: String,

                         contentType: String,

                         requestBody: String,

                         httpReferrer: String,

                         remoteAddr: String,

                         httpUserAgent: String,

                         timeIso8601: String,

                         serverAddr: String,

                         cookiesStr: String

                         )

 

 

Flink在流处理上常见的sink

1:将数据sink到本地文件(参考批处理)

2:Sink到本地集合(参考批处理)

3:Sink到HDFS(参考批处理)

4:sink到kafka

package com.flink.DataStream

  

  import java.util.Properties

  

  import org.apache.flink.api.common.serialization.SimpleStringSchema

  import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

  import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, FlinkKafkaProducer09}

  import org.apache.flink.streaming.api.functions.source.SourceFunction

  import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

  import org.apache.flink.api.scala._

  import org.apache.kafka.common.serialization.ByteArraySerializer

  /**

  * Created by angel;

  */

  object DataSource_kafka {

  def main(args: Array[String]): Unit = {

    //1指定kafka数据流的相关信息

    val zkCluster = "hadoop01,hadoop02,hadoop03:2181"

    val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"

    val kafkaTopicName = "test"

    val sinkKafka = "test2"

    //2.创建流处理环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment

  

    //3.创建kafka数据流

    val properties = new Properties()

    properties.setProperty("bootstrap.servers", kafkaCluster)

    properties.setProperty("zookeeper.connect", zkCluster)

    properties.setProperty("group.id", kafkaTopicName)

  

    val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName, new SimpleStringSchema(), properties)

    //4.添加数据源addSource(kafka09)

    val text = env.addSource(kafka09).setParallelism(4)

  

    /**

      * test#CS#request http://b2c.csair.com/B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=2018-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'2018-07-12','infantnum':'2'}#CS#http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=2018-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#2018-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie

      * */

    val values: DataStream[ProcessedData] = text.map{

      line =>

        var encrypted = line

        val values = encrypted.split("#CS#")

        val valuesLength = values.length

        var regionalRequest =  if(valuesLength > 1) values(1) else ""

        val requestMethod = if (valuesLength > 2) values(2) else ""

        val contentType = if (valuesLength > 3) values(3) else ""

        //Post提交的数据体

        val requestBody = if (valuesLength > 4) values(4) else ""

        //http_referrer

        val httpReferrer = if (valuesLength > 5) values(5) else ""

        //客户端IP

        val remoteAddr = if (valuesLength > 6) values(6) else ""

        //客户端UA

        val httpUserAgent = if (valuesLength > 7) values(7) else ""

        //服务器时间的ISO8610格式

        val timeIso8601 = if (valuesLength > 8) values(8) else ""

        //服务器地址

        val serverAddr = if (valuesLength > 9) values(9) else ""

        //获取原始信息中的cookie字符串

        val cookiesStr = if (valuesLength > 10) values(10) else ""

        ProcessedData(regionalRequest,

          requestMethod,

          contentType,

          requestBody,

          httpReferrer,

          remoteAddr,

          httpUserAgent,

          timeIso8601,

          serverAddr,

          cookiesStr)

  

    }

    values.print()

    val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)

    remoteAddr.print()

      //TODO sink到kafka

    val p: Properties = new Properties

    p.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092")

    p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)

    p.setProperty("value.serializer", classOf[ByteArraySerializer].getName)

    val sink = new FlinkKafkaProducer09[String](sinkKafka, new SimpleStringSchema(), properties)

    remoteAddr.addSink(sink)

    //5.触发运算

    env.execute("flink-kafka-wordcunt")

  }

}

  //保存结构化数据

  case class ProcessedData(regionalRequest: String,

                         requestMethod: String,

                         contentType: String,

                         requestBody: String,

                         httpReferrer: String,

                         remoteAddr: String,

                         httpUserAgent: String,

                         timeIso8601: String,

                         serverAddr: String,

                         cookiesStr: String

                         )

 

 

基于mysql的sink和source

1:基于mysql的source操作:

object MysqlSource {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val source: DataStream[Student] = env.addSource(new SQL_source)

    source.print()

    env.execute()

  }

}

  class SQL_source extends RichSourceFunction[Student]{

  private var connection: Connection = null

  private var ps: PreparedStatement = null

  

  override def open(parameters: Configuration): Unit = {

    val driver = "com.mysql.jdbc.Driver"

    val url = "jdbc:mysql://hadoop01:3306/test"

    val username = "root"

    val password = "root"

    Class.forName(driver)

    connection = DriverManager.getConnection(url, username, password)

    val sql = "select stuid , stuname , stuaddr , stusex from Student"

    ps = connection.prepareStatement(sql)

  }

  

  override def close(): Unit = {

    if(connection != null){

      connection.close()

    }

    if(ps != null){

      ps.close()

    }

  }

  

  

  override def run(sourceContext: SourceContext[Student]): Unit = {

    val queryRequest = ps.executeQuery()

    while (queryRequest.next()){

      val stuid = queryRequest.getInt("stuid")

      val stuname = queryRequest.getString("stuname")

      val stuaddr = queryRequest.getString("stuaddr")

      val stusex = queryRequest.getString("stusex")

      val stu = new Student(stuid , stuname , stuaddr , stusex)

      sourceContext.collect(stu)

    }

  }

  override def cancel(): Unit = {}

}

  

  

  case class Student(stuid:Int , stuname:String , stuaddr:String , stusex:String){

  override def toString: String = {

    "stuid:"+stuid+"  stuname:"+stuname+"   stuaddr:"+stuaddr+"   stusex:"+stusex

  }

}

2:基于mysql的sink操作

object MysqlSink {

  def main(args: Array[String]): Unit = {

    //1.创建流执行环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.准备数据

    val dataStream:DataStream[Student] = env.fromElements(

      Student(8, "xiaoming", "beijing biejing", "female")

  //      Student(6, "daming", "tainjing tianjin", "male "),

//      Student(7, "daqiang ", "shanghai shanghai", "female")

    )

  

    //3.将数据写入到自定义的sink中(这里是mysql)

    dataStream.addSink(new StudentSinkToMysql)

    //4.触发流执行

    env.execute()

  }

}

  

  class StudentSinkToMysql extends RichSinkFunction[Student]{

  private var connection:Connection = null

  private var ps:PreparedStatement = null

  

  override def open(parameters: Configuration): Unit = {

    val driver = "com.mysql.jdbc.Driver"

    val url = "jdbc:mysql://hadoop01:3306/test"

    val username = "root"

    val password = "root"

    //1:加载驱动

    Class.forName(driver)

    //2:创建连接

    connection = DriverManager.getConnection(url , username , password)

    val sql = "insert into Student(stuid , stuname , stuaddr , stusex) values(?,?,?,?);"

    //3:获得执行语句

    ps = connection.prepareStatement(sql)

  }

  

  //关闭连接操作

  override def close(): Unit = {

    if(connection != null){

      connection.close()

    }

    if(ps != null){

      ps.close()

    }

  }

  //每个元素的插入,都要触发一次invoke,这里主要进行invoke插入

  override def invoke(stu: Student): Unit = {

    try{

      //4.组装数据,执行插入操作

      ps.setInt(1, stu.stuid)

      ps.setString(2, stu.stuname)

      ps.setString(3, stu.stuaddr)

      ps.setString(4, stu.stusex)

      ps.executeUpdate()

    }catch {

      case e:Exception => println(e.getMessage)

    }

  }

}