Flink自定义source连接ES
程序员文章站
2022-06-16 12:16:03
...
Flink自动定义source
继承sourceFunction
重写run和cancel方法
class EsSource extends SourceFunction[String]{
//判断是否取消运行
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val client: JestClient = EsClient.getEsClient()
//開始採集數據
log.info()
while (isRunning) {
EsClient.bulkInsertData("", new util.ArrayList[Map.type])
}
}
override def cancel(): Unit = isRunning = false
}
Jest操作ES获取client
private class EsClient {
val log: Logger = Logger.getLogger(ComputeTimeDifferences.getClass)
def getClient(connectionUrl: String, username: String, password: String): JestClient = {
val factory: JestClientFactory = new JestClientFactory()
var httpClientConfig: HttpClientConfig = null
if (username != null) {
log.info("Using HTTP Basic Authentication");
httpClientConfig = new HttpClientConfig.Builder(connectionUrl)
.defaultCredentials(username, password)
.multiThreaded(true)
.build();
} else {
httpClientConfig = new HttpClientConfig.Builder(connectionUrl)
.multiThreaded(true)
.build();
}
factory.setHttpClientConfig(httpClientConfig)
factory.getObject
}
}
断言判断连接是否成功
def assertThatConnectionToElasticsearchIsPossible(numRetries: Int, waitTime: Int, waitUnit: TimeUnit): Unit = {
val jestClient = getEsClient()
try {
for (i <- 0 until numRetries) {
try {
val request = new State.Builder().build
val result = jestClient.execute(request)
if (result.isSucceeded) return
} catch {
case e: Exception =>
Uninterruptibles.sleepUninterruptibly(waitTime, waitUnit)
}
}
} finally if (jestClient != null) jestClient.shutdownClient()
throw new AssertionError("Couldn't connect to Elasticsearch")
}
获取jest客户端
def getEsClient(): JestClient = {
val client: JestClient = esClient.getClient("http://192.168.10.204:9200", "elastic", "123456")
client
}
私有化EsClient,只能使用伴生对象获取客户端连接,实现单例设计模式
private class EsClient {
val log: Logger = Logger.getLogger(ComputeTimeDifferences.getClass)
def getClient(connectionUrl: String, username: String, password: String): JestClient = {
val factory: JestClientFactory = new JestClientFactory()
var httpClientConfig: HttpClientConfig = null
if (username != null) {
log.info("Using HTTP Basic Authentication");
httpClientConfig = new HttpClientConfig.Builder(connectionUrl)
.defaultCredentials(username, password)
.multiThreaded(true)
.build();
} else {
httpClientConfig = new HttpClientConfig.Builder(connectionUrl)
.multiThreaded(true)
.build();
}
factory.setHttpClientConfig(httpClientConfig)
factory.getObject
}
}
上一篇: THINKPHP内容分页代码分享_PHP
下一篇: mysql执行计划介绍_MySQL