欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink 入门实战之七Source自定义读取Mongodb数据

程序员文章站 2022-03-14 18:13:13
...
  • flink自定义SourceRichFunction读取mongodb数据
package com.day
import com.mongodb.BasicDBObject
import com.mongodb.casbah.Imports.{MongoClient, MongoClientURI, MongoDBObject}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object SourceMongodb {
  case class User (uid: Int, name: String, age: Int, sex:String)

  def main(args: Array[String]): Unit = {
    // 1、获取流式环境变量
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、添加自定义source  连接mongodb
    val dataStream = env.addSource(new MongodbSource)
    // 3、打印数据
    dataStream.print()
    // 4、执行任务
    env.execute("mysql Job")
  }

  //自定义source连接mongodb
  class MongodbSource extends RichSourceFunction[User] {
    val uri = "mongodb://root:aaa@qq.com:27017/"
    var mongoClient:MongoClient = _
    override def open(parameters: Configuration): Unit = {
      mongoClient = MongoClient(MongoClientURI(uri))
    }

    override def run(ctx: SourceFunction.SourceContext[User]): Unit = {
      val uid = 0
      val coll = mongoClient(s"db_user")(s"UserInfo")
      // 匹配name包含‘小’,且uid不为0的数据
      val query = new BasicDBObject("name", new BasicDBObject("$regex", "(.*"+"小"+".*)"))
        .append("uid", new BasicDBObject("$ne", uid))
      val obj = coll.find(query)
      if(obj.nonEmpty){
        val info = obj.next()
        ctx.collect(
          User(
            uid = info.get("uid").toString.toInt,
            name = info.get("name").toString,
            age = info.get("age").toString.toInt,
            sex = info.get("sex").toString
          )
        )
      }
    }

    override def cancel(): Unit = {
      if (mongoClient != null) mongoClient.close
    }
  }
}

  • 核心依赖
 <!-- mongo连接器 -->
<dependency>
   <groupId>org.mongodb</groupId>
   <artifactId>casbah-core_2.11</artifactId>
   <version>3.1.1</version>
   <scope>${scope}</scope>
</dependency>
  • 数据
    Flink 入门实战之七Source自定义读取Mongodb数据
    Flink 入门实战之七Source自定义读取Mongodb数据
  • 注意
    由于使用第三方casbah连接器,发布到生产环境时候要将casbah-core_2.11.jar引入flink/lib下面