Flink学习笔记-常用Source和Sink简单示例
程序员文章站
2022-07-14 14:16:15
...
Flink 1.10.0 版本目前支持以下的Connector
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)
Maven配置Flink依赖
<flink.version>1.10.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
Collection Source
def collectionSource(env: StreamExecutionEnvironment): Unit = {
val collection_stream = env.fromCollection(List(("zhang li zhang"), ("wang zhang li")))
val stream = collection_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
stream.print()
}
Text File Source And Sink
def textFileSourceSink(env: StreamExecutionEnvironment):Unit = {
val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\wc.txt")
val stream = file_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
stream.print()
stream.writeAsText("G:\\workspace\\flink\\src\\main\\resources\\r1")
stream.writeAsCsv("G:\\workspace\\flink\\src\\main\\resources\\r2")
}
Socket Source And Sink
def socketSourceSink(env: StreamExecutionEnvironment):Unit = {
val socket_stream:DataStream[String] = env.socketTextStream("192.168.0.1", 7000)
val stream = socket_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
stream.print()
stream.writeToSocket(hostname = "192.168.0.1", port = 7001, new SerializationSchema[(String, Int)] {
override def serialize(t: (String, Int)): Array[Byte] = {
var oos: ObjectOutputStream = null
var baos: ByteArrayOutputStream = null
try {
baos = new ByteArrayOutputStream()
oos = new ObjectOutputStream(baos)
oos.writeBytes(t._1)
oos.writeInt(t._2)
return baos.toByteArray
} catch {
case e: Exception => e.printStackTrace()
} finally {
baos.close()
oos.close()
}
null
}
})
}
Kafka Source And Sink
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
def kafkaSourceSink(env: StreamExecutionEnvironment):Unit = {
val properties1 = new Properties()
properties1.setProperty("bootstrap.servers", "192.168.0.1:9092")
properties1.setProperty("zookeeper.connect", "192.168.0.1:2181/kafka")
properties1.setProperty("group.id", "tmp-1")
properties1.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties1.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties1.setProperty("auto.offset.reset", "latest")
val kafka_stream = env.addSource(new FlinkKafkaConsumer[String]("tmp1", new SimpleStringSchema(), properties1))
val stream = kafka_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1).map(_.toString())
stream.print("kafka consumer")
val properties2 = new Properties()
properties2.put("bootstrap.servers", "192.168.0.1:9092")
stream.addSink(new FlinkKafkaProducer[String]("tmp2", new SimpleStringSchema(), properties2))
stream.addSink(new FlinkKafkaProducer[String]("tmp2", new KafkaSerializationSchema[String] {
override def serialize(value: String, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
var oos: ObjectOutputStream = null
var baos: ByteArrayOutputStream = null
try {
baos = new ByteArrayOutputStream()
oos = new ObjectOutputStream(baos)
oos.writeBytes(value)
return new ProducerRecord[Array[Byte], Array[Byte]]("tmp2", baos.toByteArray)
} catch {
case e: Exception => e.printStackTrace()
} finally {
baos.close()
oos.close()
}
null
}
}, properties2, FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
}
Redis Sink
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
def redisSink(env: StreamExecutionEnvironment):Unit = {
val jedisPoolConfig: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("192.168.0.1").setPort(6379).build()
val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\wc.txt")
val stream = file_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
stream.print()
stream.addSink(new RedisSink[(String, Int)](jedisPoolConfig, new RedisMapper[(String, Int)] {
override def getCommandDescription: RedisCommandDescription = {
return new RedisCommandDescription(RedisCommand.HSET, "tmp1")
}
override def getKeyFromData(t: (String, Int)): String = t._1
override def getValueFromData(t: (String, Int)): String = t._2.toString
}))
}
def redisClusterSink(env: StreamExecutionEnvironment):Unit = {
val nodes = new util.HashSet[InetSocketAddress]()
nodes.add(new InetSocketAddress( "192.168.0.1",6373))
nodes.add(new InetSocketAddress( "192.168.0.1",6374))
nodes.add(new InetSocketAddress( "192.168.0.1",6375))
nodes.add(new InetSocketAddress( "192.168.0.1",6376))
nodes.add(new InetSocketAddress( "192.168.0.1",6377))
nodes.add(new InetSocketAddress( "192.168.0.1",6378))
val jedisClusterConfig: FlinkJedisClusterConfig = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build()
val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\wc.txt")
val stream = file_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
stream.print()
stream.addSink(new RedisSink[(String, Int)](jedisClusterConfig, new RedisMapper[(String, Int)] {
override def getCommandDescription: RedisCommandDescription = {
return new RedisCommandDescription(RedisCommand.HSET, "tmp1")
}
override def getKeyFromData(t: (String, Int)): String = t._1
override def getValueFromData(t: (String, Int)): String = t._2.toString
}))
}
Elasticsearch Sink
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.12</artifactId>
<version>1.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.10.0</version>
</dependency>
def elasticsearch5Sink(env: StreamExecutionEnvironment): Unit = {
val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\users.txt")
file_stream.print()
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-application")
config.put("bulk.flush.max.actions", "1")
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("192.168.0.1"), 9300))
val elasticsearchSink = new ElasticsearchSink[String](config, transportAddresses,
new ElasticsearchSinkFunction[String] {
override def process(element: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
val datas: Array[String] = element.split(",")
val data = new util.HashMap[String, String]()
data.put("id", datas(0))
data.put("name", datas(1))
data.put("phone", datas(2))
data.put("age", datas(3))
val indexRequest = Requests.indexRequest().index("user").source(data)
requestIndexer.add(indexRequest)
}
},
new ActionRequestFailureHandler() {
@throws(classOf[Throwable])
override def onFailure(actionRequest: ActionRequest, throwable: Throwable, restStatusCode: Int,
requestIndexer: RequestIndexer): Unit = {
if (ExceptionUtils.findThrowable(throwable, classOf[EsRejectedExecutionException]).isPresent()) {
requestIndexer.add(actionRequest)
} else if (ExceptionUtils.findThrowable(throwable, classOf[ElasticsearchParseException]).isPresent()) {
} else {
throw throwable
}
}
}
)
file_stream.addSink(elasticsearchSink)
}
def elasticsearch7Sink(env: StreamExecutionEnvironment): Unit = {
val file_stream:DataStream[String] = env.readTextFile("G:\\workspace\\flink\\src\\main\\resources\\users.txt")
file_stream.print()
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("192.168.0.1", 9200, "http"))
val elasticsearchSinkBuilder = new ElasticsearchSink.Builder[String](httpHosts,
new ElasticsearchSinkFunction[String] {
override def process(element: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
val datas: Array[String] = element.split(",")
val data = new util.HashMap[String, String]()
data.put("id", datas(0))
data.put("name", datas(1))
data.put("phone", datas(2))
data.put("age", datas(3))
val indexRequest = Requests.indexRequest().index("user").source(data)
requestIndexer.add(indexRequest)
}
})
elasticsearchSinkBuilder.setBulkFlushMaxActions(1)
elasticsearchSinkBuilder.setRestClientFactory(new RestClientFactory {
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
/**
restClientBuilder.setPathPrefix()
restClientBuilder.setDefaultHeaders()
restClientBuilder.setRequestConfigCallback()
restClientBuilder.setHttpClientConfigCallback()
restClientBuilder.setFailureListener()
**/
}
})
elasticsearchSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
@throws(classOf[Throwable])
override def onFailure(actionRequest: ActionRequest, throwable: Throwable, restStatusCode: Int,
requestIndexer: RequestIndexer): Unit = {
if (ExceptionUtils.findThrowable(throwable, classOf[EsRejectedExecutionException]).isPresent()) {
requestIndexer.add(actionRequest)
} else if (ExceptionUtils.findThrowable(throwable, classOf[ElasticsearchParseException]).isPresent()) {
} else {
throw throwable
}
}
})
file_stream.addSink(elasticsearchSinkBuilder.build())
}
自定义Source
class CustomSource() extends SourceFunction[String] {
var runFlag: Boolean = true
val values: Array[String] = Array[String]("zhang", "wang", "li", "zhao", "qi", "chu", "yan", "han")
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val random = new Random()
while (runFlag) {
sourceContext.collect(values(random.nextInt(values.length)))
Thread.sleep(2000)
}
}
override def cancel(): Unit = {
runFlag = false
}
}
def customSource(env: StreamExecutionEnvironment):Unit = {
val custom_stream = env.addSource(new CustomSource())
val stream = custom_stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
stream.print()
}
自定义Sink,JdbcSink
class JdbcSink() extends RichSinkFunction[String] {
var connection: Connection = _
var insertStmt: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
connection = DriverManager.getConnection("jdbc:mysql://192.168.0.1:3306/test", "root", "")
insertStmt = connection.prepareStatement("INSERT INTO T_USER(ID,NAME,PHONE) VALUES (?,?,?)")
}
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
val datas: Array[String] = value.split(",")
insertStmt.setInt(1, datas(0).toInt)
insertStmt.setString(2, datas(1))
insertStmt.setString(3, datas(2))
insertStmt.execute()
print(insertStmt.getUpdateCount)
}
override def close(): Unit = {
insertStmt.close()
connection.close()
}
}
def jdbcSink(env: StreamExecutionEnvironment): Unit = {
val collection_stream = env.fromCollection(List(("21,zhangsan1,13100000001"), ("22,zhangsan2,13100000002")))
collection_stream.addSink(new JdbcSink())
}