欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink Source、Sink

程序员文章站 2022-06-17 10:07:23
...

程序部署

本地执行
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.createLocalEnvironment(3)

    //2.创建DataStream
    val text = env.socketTextStream("train",9999)

    //3.执行DataStream的转换算子
    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
远程部署
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //2.创建DataStream
    val text = env.socketTextStream("train",9999)

    //3.执行DataStream的转换算子
    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")

StreamExecutionEnvironment.getExecutionEnvironment自动识别运行环境,如果运行环境是idea,系统会自动切换成本地模式,默认系统的并行度使用系统最大线程数,等价于Spark中设置的local[*],如果是生产环境,需要用户在提交任务的时候指定并行度--parallelism

  • 部署方式
    • WEB UI部署(略)
    • 通过脚本部署
[aaa@qq.com ~]# cd /usr/soft/flink-1.10.0/
[aaa@qq.com flink-1.10.0]# ./bin/flink run 
						--class com.baizhi.quickstart.FlinkWordCountTestLocal 
						--detached --parallelism 4 
						--jobmanager train:8081 /root/flink-1.0-SNAPSHOT.jar
Job has been submitted with JobID 808021820a80d008e8fcd7e72bba1029

查看现有任务

[aaa@qq.com flink-1.10.0]# ./bin/flink list --running --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------

[aaa@qq.com flink-1.10.0]# ./bin/flink list --all --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

取消指定任务

[aaa@qq.com flink-1.10.0]# ./bin/flink cancel --jobmanager train:8081 808021820a80d008e8fcd7e72bba1029
Cancelling job 808021820a80d008e8fcd7e72bba1029.
Cancelled job 808021820a80d008e8fcd7e72bba1029.

查看程序执行计划

[aaa@qq.com flink-1.10.0]# ./bin/flink info --class com.baizhi.quickstart.FlinkWordCountTestLocal --parallelism 4 /root/flink-1.0-SNAPSHOT.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream","parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":4,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":3,"type":"Map","pact":"Operator","contents":"Map","parallelism":4,"predecessors":[{"id":2,"ship_strategy":"FORWARD","side":"second"}]},{"id":5,"type":"aggregation","pact":"Operator","contents":"aggregation","parallelism":4,"predecessors":[{"id":3,"ship_strategy":"HASH","side":"second"}]},{"id":6,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":4,"predecessors":[{"id":5,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

No description provided.

⽤户可以访问:https://flink.apache.org/visualizer/将json数据粘贴过去,查看Flink执⾏计划图
Flink Source、Sink

跨平台发布
//1.创建流计算执行环境
    var jars = "D:\\ideacores\\Flink\\flink\\target\\flink-1.0-SNAPSHOT.jar"

    val env = StreamExecutionEnvironment.createRemoteEnvironment("train",8081,jars)
    
    //设置默认并行度
    env.setParallelism(4)

    //2.创建DataStream
    val text = env.socketTextStream("train",9999)

    //3.执行DataStream的转换算子
    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")

在运行之前需要使用mvn重新打包程序。直接运行main函数即可

Streaming(DataStream API)
DataSource

数据源是程序读取数据的来源,用户可以通过env.addSource(SourceFunction),将SourceFunction添加到程序中。Flink内置许多已知实现的SourceFunction,但是用户可以自定义实现SourceFunction(非并行化的接口)接口或者实现ParallelSourceFunction(并行化)接口,如果需要有状态管理还可以继承RichParallelSourceFunction

File-based

  • readTextFile(path) - 读取文本文件(一次) 并以字符串的形式返回它们
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val text = env.readTextFile("hdfs://train:9000/demo/words")


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
  • readFile(fileInputFormat, path) -根据指定的文件输入读取(一次)文件
    格式
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)
    
    val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words")


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) -
    这是前两个方法在内部调用的方。 它读取路径中的文件fileInputFormat . 根据所提供的watchType,此源可以定期调用
    监视(每隔ms)新数据的路径
    (FileProcessingMode.PROCESS_CONTINUOUSLY), 或处理当前路径中的数据
    退出文件处理模式。PROCESS_ONCE)。使用路径过滤器,用户可以进一步
    排除正在处理的文件。
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)

    val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words",FileProcessingMode.PROCESS_CONTINUOUSLY,1000)


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")

该方法会检查采集目录下的文件,如果文件发生变化系统会重新采集。此时可能会导致文件的重复计算。一般来说不建议修改文件内容,直接上传新文件即可。

Socket Based

  • socketTextStream - Reads from a socket. Elements can be separated by a delimiter.
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val text = env.socketTextStream("train",9999,'\n',3)


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")

Collection-based

//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)

    val text = env.fromCollection(List("this is a demo","hello word"))


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
UserDefinedSource
  • SourceFunction(非并行化)
import java.util.Random

import org.apache.flink.streaming.api.functions.source.SourceFunction

class UserDefinedNonParallelSourceFunction extends SourceFunction[String]{
  @volatile //防止线程拷贝变量
  var isRunning = true
  var lines = Array("this is a demo","hello world","ni hao ma")
  //在该方法中启动线程,通过sourceContext的collect方法发送数据
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      Thread.sleep(100)
      //输送数据给下游
      sourceContext.collect(lines(new Random().nextInt(lines.size)))
    }
  }
  //释放资源
  override def cancel(): Unit = {
    isRunning = false
  }
}

测试

 //1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)

    val text = env.addSource(new UserDefinedNonParallelSourceFunction)


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
  • ParallelSourceFunction(并行化)
import java.util.Random

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

class UserDefinedNonParallelSourceFunction extends ParallelSourceFunction[String]{
  @volatile //防止线程拷贝变量
  var isRunning = true
  var lines = Array("this is a demo","hello world","ni hao ma")
  //在该方法中启动线程,通过sourceContext的collect方法发送数据
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    while (isRunning){
      Thread.sleep(100)
      //输送数据给下游
      sourceContext.collect(lines(new Random().nextInt(lines.size)))
    }
  }
  //释放资源
  override def cancel(): Unit = {
    isRunning = false
  }
}

测试

//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val inputFormat = new TextInputFormat(null)

    val text = env.addSource(new UserDefinedNonParallelSourceFunction)


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.print()

    //5.执行流计算任务
    env.execute("Window Stream WordCount")
Kafka Source
  • 引入maven
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.11</artifactId>
 <version>1.10.0</version>
</dependency>
  • SimpleStringSchema
    该SimpleStringSchema方案只会反序列化kafka中的value
 //1.创建流计算执⾏环境
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 //2.创建DataStream - 细化
 val props = new Properties()
 props.setProperty("bootstrap.servers", "CentOS:9092")
 props.setProperty("group.id", "g1")
 val text = env.addSource(new FlinkKafkaConsumer[String]("topic01",new
SimpleStringSchema(),props))
 //3.执⾏DataStream的转换算⼦
 val counts = text.flatMap(line=>line.split("\\s+"))
 .map(word=>(word,1))
 .keyBy(0)
 .sum(1)
 //4.将计算的结果在控制打印
 counts.print()
 //5.执⾏流计算任务
 env.execute("Window Stream WordCount")
  • KafkaDeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._
class UserKafkaDeserialization extends KafkaDeserializationSchema[(String,String,Int,Long)]{
  override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
    if(consumerRecord.key()!=null){
      (new String(consumerRecord.key()),new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }else{
      (null,new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }
  }

  override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
    createTypeInformation[(String,String,Int,Long)]
  }
}
 //1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val prop = new Properties()
    prop.setProperty("bootstrap.servers","train:9092")
    prop.setProperty("group.id", "g1")

   

    val text = env.addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]("topic01",new UserKafkaDeserialization,prop))

    text.flatMap(line=>line._2.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)
      .print()


    env.execute("Window Stream")
  • JSONKeyValueNodeDeserializationSchema
  • 要求Kafka中的topic的key和value都必须是json格式,也可以在使⽤的时候,指定是否读取元数据
    (topic、分区、o!set等)
//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val prop = new Properties()
    prop.setProperty("bootstrap.servers","train:9092")
    prop.setProperty("group.id", "g1")



    val text = env.addSource(new FlinkKafkaConsumer[ObjectNode]("topic01",new JSONKeyValueDeserializationSchema(true),prop))
      //t:{"value":{"id":1,"name":"zhangsan"},"metadata":{"offset":0,"topic":"topic01","partition":13}}
    text.map(word=>(word.get("value").get("id").asInt(),word.get("value").get("name").asText()))
      .print()


    env.execute("Window Stream")

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

Data Sinks

Data Sink使用DataStreams并将其转发到文件,Socket,外部系统或打印它们。Flink带有多种内置输出格式,这些格式封装在DataStreams的操作后面。
File-based

  • writeAsText()/TextOutputFormat:将元素按行写入为字符串。这些字符串是通过调用每个元素的toString()方法获得的。
  • writeAsCsv(…) / CsvOutputFormat:将元组写入逗号分隔的值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法
  • writeUsingOutputFormat/ FileOutputFormat:方法和自定义文件输出的基类。支持自定义对象到字节的转换。

注意DataStream上的write*()方法主要用于调试目的。

//1.创建流计算执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2.创建DataStream - 细化
    val text = env.fromCollection(List("this is a demo","hello word"))


    //3.执行DataStreaem的转换算子
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    //4.将计算的结果在控制台打印
    counts.writeUsingOutputFormat(new TextOutputFormat[(String, Int)](new Path("hdfs://train:9000/flink-results")))

    //5.执行流计算任务
    env.execute("Window Stream WordCount")

注意事项:如果改成HDFS,需要用户自己产生大量数据,才能看到测试效果,原因是因为HDFS文件系统写入时的缓冲区比较大、以上写入文件系统的Sink不能够参与系统检查点,如果在生产环境下通常使用flink-connector-filesystem写入到外围系统。

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-filesystem_2.11</artifactId>
 <version>1.10.0</version>
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")

    val bucketingSink = StreamingFileSink.forRowFormat(new Path("hdfs://train:9000/bucket-results"),
      new SimpleStringEncoder[(String, Int)]("UTF-8")
    ).withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd")) //动态产生写入路径
      .build()

    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(bucketingSink)

老版本写法

val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")

    val bucketingSink = new BucketingSink[(String,Int)]("hdfs://train:9000/bucket-results")
    bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
    bucketingSink.setBatchSize(1024)

    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(bucketingSink)
print()/printToErr()

Prints the toString() value of each element on the standard out / standard error stream. Optionally, aprefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print . If the parallelism is greater than 1, the output will also be prepended with theidentifier of the task which produced the output.

打印标准输出/标准错误流中每个元素的toString()值。可选地,可以提供aprefix (msg),它预先写入输出。这有助于区分不同的打印调用。如果并行度大于1,输出也将以产生输出的任务的标识符作为前缀。

val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")


    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.setParallelism(2).print("测试")

    env.execute("Window Stream")
UserDefinedSinkFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

class UserDefinedSinkFunction extends RichSinkFunction[(String,Int)]{
  override def open(parameters: Configuration): Unit = {
    println("打开链接...")
  }

  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    println("输出:"+value)
  }

  override def close(): Unit = {
    println("释放链接")
  }

}
val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile("hdfs://train:9000/demo/words")


    val counts = text.flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(new UserDefinedSinkFunction)

    env.execute("Window Stream WordCount")
RedisSink

参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

<dependency>
 <groupId>org.apache.bahir</groupId>
 <artifactId>flink-connector-redis_2.11</artifactId>
 <version>1.0</version>
</dependency>
//1.创建流计算执⾏环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //2.创建DataStream - 细化
    val text = env.readTextFile("hdfs://train:9000/demo/words")

    var flinkJeidsConf = new FlinkJedisPoolConfig.Builder()
      .setHost("train")
      .setPort(6379)
      .build()

    //3.执⾏DataStream的转换算⼦
    val counts = text.flatMap(line=>line.split("\\s+"))
      .map(word=>(word,1))
      .keyBy(0)
      .sum(1)

    counts.addSink(new RedisSink(flinkJeidsConf,new UserDefinedSinkFunction()))

    env.execute("Window Stream")
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

class UserDefinedSinkFunction extends RedisMapper[(String,Int)]{
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"wordcounts")
  }

  override def getKeyFromData(t: (String, Int)): String = t._1

  override def getValueFromData(t: (String, Int)): String = t._2+""
}