flink的常用Source和Sink
程序员文章站
2022-07-14 14:16:27
...
一、KafkaSource和KafkaSink
由于flink经常用于对数据实时流进行处理,而我们经常使用Kafka可以对流数据进行削峰处理,所以flink Streaming经常和kafka一起使用
在flink中已经对kafka的source和sink进行比较高的整合度了,所以使用很方便
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val result= env.readTextFile("filePath")
.flatMap(_.split(" "))
//addSource
//kafka的consumer拿到数据供flink分析
val props = new Properties()
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092")
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"cxv")
val inputStream = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props))
//addSink
//这里将从flink流中读出的数据放入kafka中,相当于是kafka的消费者
result.addSink(new FlinkKafkaProducer[String]("node1:9092","test",new SimpleStringSchema()))
env.execute()
}
标题二、MySqlSource(自定义)和MySqlSink(自定义)
MysqlSource
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(new MyJdbcSourceFunc)
.print()
env.execute()
}
}
//继承RichParallelSourceFunction,实现其中的方法
class MyJdbcSourceFunc extends RichParallelSourceFunction[Worker]{
var conn:Connection = _
var pst:PreparedStatement = _
var flag = true
//建立数据库连接
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testevery?characterEncoding=utf-8&&useSSL=false&serverTimezone=UTC","root","1994")
pst = conn.prepareStatement("select * from worker")
}
//从数据库中取数据
override def run(ctx: SourceFunction.SourceContext[Worker]): Unit = {
while (flag){
Thread.sleep(500)
val rs = pst.executeQuery()
while (rs.next()){
val name = rs.getString(1)
val salary = rs.getLong(2)
ctx.collect(Worker(name,salary))
}
}
}
override def cancel(): Unit = {
flag=false
}
//关闭连接
override def close(): Unit = {
if (pst != null) pst.close()
if (conn != null) conn.close()
}
}
MysqlSink
//定义一个Worker样例类
case class Worker(name:String,salary:Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = env.readTextFile("filePath")
.map(line => {
val ps = line.split(",")
Worker(ps(0).toString, ps(1).toLong)
})
dataStream.addSink(new MyJDBCSinkFunc)
env.execute()
}
}
class MyJDBCSinkFunc extends RichSinkFunction[Worker]{
var conn:Connection = _
// 检查数据库有没有,没有就插入,有就更新
var upDateStatement:PreparedStatement = _
var insertStatement:PreparedStatement = _
// 创建数据库连接
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/testevery?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC",
"user",
"password")
upDateStatement = conn.prepareStatement("update worker set salary=? where name=?")
insertStatement = conn.prepareStatement("insert into worker values(?,?)")
}
override def invoke(value: Worker, context: SinkFunction.Context): Unit = {
upDateStatement.setString(2,value.name)
upDateStatement.setLong(1,value.salary)
upDateStatement.execute()
if (upDateStatement.getUpdateCount == 0){
insertStatement.setString(2,value.name)
insertStatement.setLong(1,value.salary)
insertStatement.execute()
}
}
// 关闭数据库连接
override def close(): Unit = {
if (insertStatement != null) insertStatement.close()
if (upDateStatement != null) insertStatement.close()
if (conn != null) insertStatement.close()
}
}