欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink自定义source连接ES

程序员文章站 2022-06-16 12:16:03
...

Flink自动定义source

继承sourceFunction

Flink自定义source连接ES

重写run和cancel方法

class EsSource extends SourceFunction[String]{
  //判断是否取消运行
  var isRunning = true

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val client: JestClient = EsClient.getEsClient()
    //開始採集數據
    log.info()
    while (isRunning) {
 		EsClient.bulkInsertData("", new util.ArrayList[Map.type])
    }
  }
  override def cancel(): Unit = isRunning = false
}

Jest操作ES获取client

private class EsClient {
  val log: Logger = Logger.getLogger(ComputeTimeDifferences.getClass)

  def getClient(connectionUrl: String, username: String, password: String): JestClient = {
    val factory: JestClientFactory = new JestClientFactory()
    var httpClientConfig: HttpClientConfig = null
    if (username != null) {
      log.info("Using HTTP Basic Authentication");
      httpClientConfig = new HttpClientConfig.Builder(connectionUrl)
        .defaultCredentials(username, password)
        .multiThreaded(true)
        .build();
    } else {
      httpClientConfig = new HttpClientConfig.Builder(connectionUrl)
        .multiThreaded(true)
        .build();
    }
    factory.setHttpClientConfig(httpClientConfig)
    factory.getObject
  }
}

断言判断连接是否成功

  def assertThatConnectionToElasticsearchIsPossible(numRetries: Int, waitTime: Int, waitUnit: TimeUnit): Unit = {
    val jestClient = getEsClient()
    try {
      for (i <- 0 until numRetries) {
        try {
          val request = new State.Builder().build
          val result = jestClient.execute(request)
          if (result.isSucceeded) return
        } catch {
          case e: Exception =>
            Uninterruptibles.sleepUninterruptibly(waitTime, waitUnit)
        }
      }
    } finally if (jestClient != null) jestClient.shutdownClient()
    throw new AssertionError("Couldn't connect to Elasticsearch")
  }

获取jest客户端

   def getEsClient(): JestClient = {
    val client: JestClient = esClient.getClient("http://192.168.10.204:9200", "elastic", "123456")
    client
  }

私有化EsClient,只能使用伴生对象获取客户端连接,实现单例设计模式

private class EsClient {
  val log: Logger = Logger.getLogger(ComputeTimeDifferences.getClass)

  def getClient(connectionUrl: String, username: String, password: String): JestClient = {
    val factory: JestClientFactory = new JestClientFactory()
    var httpClientConfig: HttpClientConfig = null
    if (username != null) {
      log.info("Using HTTP Basic Authentication");
      httpClientConfig = new HttpClientConfig.Builder(connectionUrl)
        .defaultCredentials(username, password)
        .multiThreaded(true)
        .build();
    } else {
      httpClientConfig = new HttpClientConfig.Builder(connectionUrl)
        .multiThreaded(true)
        .build();
    }
    factory.setHttpClientConfig(httpClientConfig)
    factory.getObject
  }
}
相关标签: flink 大数据