欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

flink的常用Source和Sink

程序员文章站 2022-07-14 14:16:27
...

一、KafkaSource和KafkaSink

  由于flink经常用于对数据实时流进行处理,而我们经常使用Kafka可以对流数据进行削峰处理,所以flink Streaming经常和kafka一起使用
  在flink中已经对kafka的source和sink进行比较高的整合度了,所以使用很方便

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val result= env.readTextFile("filePath")
      .flatMap(_.split(" "))
      
	//addSource
	//kafka的consumer拿到数据供flink分析
	val props = new Properties()
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092")
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"cxv")
    val inputStream = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props))	

	//addSink
	//这里将从flink流中读出的数据放入kafka中,相当于是kafka的消费者
    result.addSink(new FlinkKafkaProducer[String]("node1:9092","test",new SimpleStringSchema()))

    env.execute()
  }

标题二、MySqlSource(自定义)和MySqlSink(自定义)

MysqlSource

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.addSource(new MyJdbcSourceFunc)
      .print()

    env.execute()
  }
}

//继承RichParallelSourceFunction,实现其中的方法
class MyJdbcSourceFunc extends RichParallelSourceFunction[Worker]{
  var conn:Connection = _
  var pst:PreparedStatement = _
  var flag = true

  //建立数据库连接
  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testevery?characterEncoding=utf-8&&useSSL=false&serverTimezone=UTC","root","1994")
    pst = conn.prepareStatement("select * from worker")
  }


  //从数据库中取数据
  override def run(ctx: SourceFunction.SourceContext[Worker]): Unit = {
    while (flag){
      Thread.sleep(500)
      val rs = pst.executeQuery()
      while (rs.next()){
        val name = rs.getString(1)
        val salary = rs.getLong(2)

        ctx.collect(Worker(name,salary))
      }
    }
  }

  override def cancel(): Unit = {
    flag=false
  }
 //关闭连接
  override def close(): Unit = {
    if (pst != null) pst.close()
    if (conn != null) conn.close()

  }
}

MysqlSink

//定义一个Worker样例类
case class Worker(name:String,salary:Long)

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream = env.readTextFile("filePath")
      .map(line => {
        val ps = line.split(",")
        Worker(ps(0).toString, ps(1).toLong)
      })

    dataStream.addSink(new MyJDBCSinkFunc)

    env.execute()
  }
}

class MyJDBCSinkFunc extends RichSinkFunction[Worker]{
 var conn:Connection = _
  // 检查数据库有没有,没有就插入,有就更新
  var upDateStatement:PreparedStatement = _
  var insertStatement:PreparedStatement = _


  // 创建数据库连接
  override def open(parameters: Configuration): Unit = {
      conn = DriverManager.getConnection(
        "jdbc:mysql://localhost:3306/testevery?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC",
        "user",
        "password")

      upDateStatement = conn.prepareStatement("update worker set salary=? where name=?")
      insertStatement = conn.prepareStatement("insert into worker values(?,?)")
    }

    override def invoke(value: Worker, context: SinkFunction.Context): Unit = {
      upDateStatement.setString(2,value.name)
      upDateStatement.setLong(1,value.salary)
      upDateStatement.execute()
      if (upDateStatement.getUpdateCount == 0){
        insertStatement.setString(2,value.name)
        insertStatement.setLong(1,value.salary)
        insertStatement.execute()
      }
    }

    // 关闭数据库连接
    override def close(): Unit = {

      if (insertStatement != null) insertStatement.close()
      if (upDateStatement != null) insertStatement.close()
      if (conn != null) insertStatement.close()
  }
}