Flink 入门实战之七Source自定义读取Mongodb数据
程序员文章站
2022-03-14 18:13:13
...
- flink自定义SourceRichFunction读取mongodb数据
package com.day
import com.mongodb.BasicDBObject
import com.mongodb.casbah.Imports.{MongoClient, MongoClientURI, MongoDBObject}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object SourceMongodb {
case class User (uid: Int, name: String, age: Int, sex:String)
def main(args: Array[String]): Unit = {
// 1、获取流式环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2、添加自定义source 连接mongodb
val dataStream = env.addSource(new MongodbSource)
// 3、打印数据
dataStream.print()
// 4、执行任务
env.execute("mysql Job")
}
//自定义source连接mongodb
class MongodbSource extends RichSourceFunction[User] {
val uri = "mongodb://root:aaa@qq.com:27017/"
var mongoClient:MongoClient = _
override def open(parameters: Configuration): Unit = {
mongoClient = MongoClient(MongoClientURI(uri))
}
override def run(ctx: SourceFunction.SourceContext[User]): Unit = {
val uid = 0
val coll = mongoClient(s"db_user")(s"UserInfo")
// 匹配name包含‘小’,且uid不为0的数据
val query = new BasicDBObject("name", new BasicDBObject("$regex", "(.*"+"小"+".*)"))
.append("uid", new BasicDBObject("$ne", uid))
val obj = coll.find(query)
if(obj.nonEmpty){
val info = obj.next()
ctx.collect(
User(
uid = info.get("uid").toString.toInt,
name = info.get("name").toString,
age = info.get("age").toString.toInt,
sex = info.get("sex").toString
)
)
}
}
override def cancel(): Unit = {
if (mongoClient != null) mongoClient.close
}
}
}
- 核心依赖
<!-- mongo连接器 -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>3.1.1</version>
<scope>${scope}</scope>
</dependency>
- 数据
- 注意
由于使用第三方casbah连接器,发布到生产环境时候要将casbah-core_2.11.jar引入flink/lib下面
上一篇: C#中常用的正则表达式