Flink Source、Sink
程序部署
本地执行
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.createLocalEnvironment(3)
//2.创建DataStream
val text = env.socketTextStream("train",9999)
//3.执行DataStream的转换算子
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
远程部署
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream
val text = env.socketTextStream("train",9999)
//3.执行DataStream的转换算子
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
StreamExecutionEnvironment.getExecutionEnvironment自动识别运行环境,如果运行环境是idea,系统会自动切换成本地模式,默认系统的并行度使用系统最大线程数,等价于Spark中设置的
local[*]
,如果是生产环境,需要用户在提交任务的时候指定并行度--parallelism
- 部署方式
- WEB UI部署(略)
- 通过脚本部署
[aaa@qq.com ~]# cd /usr/soft/flink-1.10.0/
[aaa@qq.com flink-1.10.0]# ./bin/flink run
--class com.baizhi.quickstart.FlinkWordCountTestLocal
--detached --parallelism 4
--jobmanager train:8081 /root/flink-1.0-SNAPSHOT.jar
Job has been submitted with JobID 808021820a80d008e8fcd7e72bba1029
查看现有任务
[aaa@qq.com flink-1.10.0]# ./bin/flink list --running --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------
[aaa@qq.com flink-1.10.0]# ./bin/flink list --all --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
取消指定任务
[aaa@qq.com flink-1.10.0]# ./bin/flink cancel --jobmanager train:8081 808021820a80d008e8fcd7e72bba1029
Cancelling job 808021820a80d008e8fcd7e72bba1029.
Cancelled job 808021820a80d008e8fcd7e72bba1029.
查看程序执行计划
[aaa@qq.com flink-1.10.0]# ./bin/flink info --class com.baizhi.quickstart.FlinkWordCountTestLocal --parallelism 4 /root/flink-1.0-SNAPSHOT.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream","parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":4,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":3,"type":"Map","pact":"Operator","contents":"Map","parallelism":4,"predecessors":[{"id":2,"ship_strategy":"FORWARD","side":"second"}]},{"id":5,"type":"aggregation","pact":"Operator","contents":"aggregation","parallelism":4,"predecessors":[{"id":3,"ship_strategy":"HASH","side":"second"}]},{"id":6,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":4,"predecessors":[{"id":5,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------
No description provided.
⽤户可以访问:https://flink.apache.org/visualizer/
将json数据粘贴过去,查看Flink执⾏计划图
跨平台发布
//1.创建流计算执行环境
var jars = "D:\\ideacores\\Flink\\flink\\target\\flink-1.0-SNAPSHOT.jar"
val env = StreamExecutionEnvironment.createRemoteEnvironment("train",8081,jars)
//设置默认并行度
env.setParallelism(4)
//2.创建DataStream
val text = env.socketTextStream("train",9999)
//3.执行DataStream的转换算子
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
在运行之前需要使用mvn重新打包程序。直接运行main函数即可
Streaming(DataStream API)
DataSource
数据源是程序读取数据的来源,用户可以通过env.addSource(SourceFunction)
,将SourceFunction添加到程序中。Flink内置许多已知实现的SourceFunction,但是用户可以自定义实现SourceFunction
(非并行化的接口)接口或者实现ParallelSourceFunction
(并行化)接口,如果需要有状态管理还可以继承RichParallelSourceFunction
File-based
-
readTextFile(path)
- 读取文本文件(一次) 并以字符串的形式返回它们
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://train:9000/demo/words")
//3.执行DataStreaem的转换算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
-
readFile(fileInputFormat, path)
-根据指定的文件输入读取(一次)文件
格式
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val inputFormat = new TextInputFormat(null)
val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words")
//3.执行DataStreaem的转换算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
-
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
-
这是前两个方法在内部调用的方。 它读取路径中的文件fileInputFormat
. 根据所提供的watchType,此源可以定期调用
监视(每隔ms)新数据的路径
(FileProcessingMode.PROCESS_CONTINUOUSLY
), 或处理当前路径中的数据
退出文件处理模式。PROCESS_ONCE
)。使用路径过滤器,用户可以进一步
排除正在处理的文件。
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val inputFormat = new TextInputFormat(null)
val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words",FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
//3.执行DataStreaem的转换算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
该方法会检查采集目录下的文件,如果文件发生变化系统会重新采集。此时可能会导致文件的重复计算。一般来说不建议修改文件内容,直接上传新文件即可。
Socket Based
-
socketTextStream
- Reads from a socket. Elements can be separated by a delimiter.
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.socketTextStream("train",9999,'\n',3)
//3.执行DataStreaem的转换算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
Collection-based
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val inputFormat = new TextInputFormat(null)
val text = env.fromCollection(List("this is a demo","hello word"))
//3.执行DataStreaem的转换算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
UserDefinedSource
- SourceFunction(非并行化)
import java.util.Random
import org.apache.flink.streaming.api.functions.source.SourceFunction
class UserDefinedNonParallelSourceFunction extends SourceFunction[String]{
@volatile //防止线程拷贝变量
var isRunning = true
var lines = Array("this is a demo","hello world","ni hao ma")
//在该方法中启动线程,通过sourceContext的collect方法发送数据
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while (isRunning){
Thread.sleep(100)
//输送数据给下游
sourceContext.collect(lines(new Random().nextInt(lines.size)))
}
}
//释放资源
override def cancel(): Unit = {
isRunning = false
}
}
测试
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val inputFormat = new TextInputFormat(null)
val text = env.addSource(new UserDefinedNonParallelSourceFunction)
//3.执行DataStreaem的转换算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
- ParallelSourceFunction(并行化)
import java.util.Random
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
class UserDefinedNonParallelSourceFunction extends ParallelSourceFunction[String]{
@volatile //防止线程拷贝变量
var isRunning = true
var lines = Array("this is a demo","hello world","ni hao ma")
//在该方法中启动线程,通过sourceContext的collect方法发送数据
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while (isRunning){
Thread.sleep(100)
//输送数据给下游
sourceContext.collect(lines(new Random().nextInt(lines.size)))
}
}
//释放资源
override def cancel(): Unit = {
isRunning = false
}
}
测试
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val inputFormat = new TextInputFormat(null)
val text = env.addSource(new UserDefinedNonParallelSourceFunction)
//3.执行DataStreaem的转换算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.print()
//5.执行流计算任务
env.execute("Window Stream WordCount")
Kafka Source
- 引入maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
- SimpleStringSchema
该SimpleStringSchema方案只会反序列化kafka中的value
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")
val text = env.addSource(new FlinkKafkaConsumer[String]("topic01",new
SimpleStringSchema(),props))
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制打印
counts.print()
//5.执⾏流计算任务
env.execute("Window Stream WordCount")
- KafkaDeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._
class UserKafkaDeserialization extends KafkaDeserializationSchema[(String,String,Int,Long)]{
override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
if(consumerRecord.key()!=null){
(new String(consumerRecord.key()),new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}else{
(null,new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}
}
override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
createTypeInformation[(String,String,Int,Long)]
}
}
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val prop = new Properties()
prop.setProperty("bootstrap.servers","train:9092")
prop.setProperty("group.id", "g1")
val text = env.addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]("topic01",new UserKafkaDeserialization,prop))
text.flatMap(line=>line._2.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.print()
env.execute("Window Stream")
- JSONKeyValueNodeDeserializationSchema
- 要求Kafka中的topic的key和value都必须是json格式,也可以在使⽤的时候,指定是否读取元数据
(topic、分区、o!set等)
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val prop = new Properties()
prop.setProperty("bootstrap.servers","train:9092")
prop.setProperty("group.id", "g1")
val text = env.addSource(new FlinkKafkaConsumer[ObjectNode]("topic01",new JSONKeyValueDeserializationSchema(true),prop))
//t:{"value":{"id":1,"name":"zhangsan"},"metadata":{"offset":0,"topic":"topic01","partition":13}}
text.map(word=>(word.get("value").get("id").asInt(),word.get("value").get("name").asText()))
.print()
env.execute("Window Stream")
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
Data Sinks
Data Sink使用DataStreams并将其转发到文件,Socket,外部系统或打印它们。Flink带有多种内置输出格式,这些格式封装在DataStreams的操作后面。
File-based
- writeAsText()/
TextOutputFormat
:将元素按行写入为字符串。这些字符串是通过调用每个元素的toString()方法获得的。 - writeAsCsv(…) /
CsvOutputFormat
:将元组写入逗号分隔的值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法 - writeUsingOutputFormat/
FileOutputFormat
:方法和自定义文件输出的基类。支持自定义对象到字节的转换。
注意DataStream上的write*()方法主要用于调试目的。
//1.创建流计算执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.创建DataStream - 细化
val text = env.fromCollection(List("this is a demo","hello word"))
//3.执行DataStreaem的转换算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将计算的结果在控制台打印
counts.writeUsingOutputFormat(new TextOutputFormat[(String, Int)](new Path("hdfs://train:9000/flink-results")))
//5.执行流计算任务
env.execute("Window Stream WordCount")
注意事项:如果改成HDFS,需要用户自己产生大量数据,才能看到测试效果,原因是因为HDFS文件系统写入时的缓冲区比较大、以上写入文件系统的Sink不能够参与系统检查点,如果在生产环境下通常使用flink-connector-filesystem写入到外围系统。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.10.0</version>
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://train:9000/demo/words")
val bucketingSink = StreamingFileSink.forRowFormat(new Path("hdfs://train:9000/bucket-results"),
new SimpleStringEncoder[(String, Int)]("UTF-8")
).withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd")) //动态产生写入路径
.build()
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(bucketingSink)
老版本写法
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://train:9000/demo/words")
val bucketingSink = new BucketingSink[(String,Int)]("hdfs://train:9000/bucket-results")
bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
bucketingSink.setBatchSize(1024)
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(bucketingSink)
print()/printToErr()
Prints the toString() value of each element on the standard out / standard error stream. Optionally, aprefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print . If the parallelism is greater than 1, the output will also be prepended with theidentifier of the task which produced the output.
打印标准输出/标准错误流中每个元素的toString()值。可选地,可以提供aprefix (msg),它预先写入输出。这有助于区分不同的打印调用。如果并行度大于1,输出也将以产生输出的任务的标识符作为前缀。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://train:9000/demo/words")
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.setParallelism(2).print("测试")
env.execute("Window Stream")
UserDefinedSinkFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
class UserDefinedSinkFunction extends RichSinkFunction[(String,Int)]{
override def open(parameters: Configuration): Unit = {
println("打开链接...")
}
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
println("输出:"+value)
}
override def close(): Unit = {
println("释放链接")
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://train:9000/demo/words")
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(new UserDefinedSinkFunction)
env.execute("Window Stream WordCount")
RedisSink
参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
//1.创建流计算执⾏环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//2.创建DataStream - 细化
val text = env.readTextFile("hdfs://train:9000/demo/words")
var flinkJeidsConf = new FlinkJedisPoolConfig.Builder()
.setHost("train")
.setPort(6379)
.build()
//3.执⾏DataStream的转换算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(new RedisSink(flinkJeidsConf,new UserDefinedSinkFunction()))
env.execute("Window Stream")
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
class UserDefinedSinkFunction extends RedisMapper[(String,Int)]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"wordcounts")
}
override def getKeyFromData(t: (String, Int)): String = t._1
override def getValueFromData(t: (String, Int)): String = t._2+""
}
上一篇: 有效防止数据库暴库--整理篇
下一篇: java基础语法练习
推荐阅读
-
关于open source bbs体系架构的介绍_PHP教程
-
flink教程-在IntelliJ IDEA 中玩转 checkstyle
-
Flink 从 0 到 1 学习 —— Flink 配置文件详解
-
flink-connector-jdbc.jar加入clickhouse驱动支持,并重新编译
-
Flink重新编译
-
PPA source for PHP-YAF
-
Mysql Source导入时出现乱码问题_MySQL
-
WPF 中Binding的2个属性Source与ElementName的区别
-
Look into the mechanism translating from source-code to machine-code in llvm
-
mysql source 命令导入大的sql文件的方法