欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink - RabbitMQ 自定义Source/Sink

程序员文章站 2022-07-14 14:15:57
...

RabbitMQ开启SSL安全认证后,flink-connector-rabbitmq 无法连接:

Caused by: java.lang.RuntimeException: Cannot create RMQ connection with rabbitmq-java-test at 192.168.116.100
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(RMQSource.java:172)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection timed out: connect
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
	at java.net.Socket.connect(Socket.java:606)
	at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
	at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
	at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
	at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(RMQSource.java:149)
	... 10 more

Process finished with exit code 1

引入 amqp-client 依赖

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.study</groupId>
  <artifactId>flink-rabbitmq</artifactId>
  <version>1.0.0</version>
  <name>flink-rabbitmq</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <flink.version>1.10.1</flink.version>
    <scala.version>2.11</scala.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>

    <!-- flink-scala -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- flink-scala-streaming -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- rabbitMQ -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.9.0</version>
    </dependency>

    <!--fastJson-->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.72</version>
    </dependency>
    <!-- lombok -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.10</version>
      <scope>provided</scope>
    </dependency>

  </dependencies>

  <build></build>
</project>

RabbitMQSource

package com.study.rabbitmq.source

import java.io.FileInputStream
import java.security.KeyStore

import com.rabbitmq.client.{AMQP, Channel, Connection, ConnectionFactory, DefaultConsumer, Envelope}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

class RabbitMQSource(prop: ParameterTool) extends RichSourceFunction[String]{

  private var conn: Connection = _
  private var channel: Channel = _
  private var queueName: String = _
  private var running: Boolean = true

  override def open(parameters: Configuration): Unit = {
    val factory = new ConnectionFactory
    // 设置IP
    factory.setHost(prop.get("flink.rabbitmq.host"))
    // 端口 RabbitMQ SSL 默认端口 5671
    factory.setPort(prop.get("flink.rabbitmq.port").toInt)
    // 账号密码
    factory.setUsername(prop.get("flink.rabbitmq.username"))
    factory.setPassword(prop.get("flink.rabbitmq.password"))
    // 虚拟主机
    factory.setVirtualHost(prop.get("flink.rabbitmq.virtualHost"))
    // SSL证书
    val sSLContext = loadSSLCertificate()
    factory.useSslProtocol(sSLContext)
    // 连接 通道 队列
    conn = factory.newConnection
    channel = conn.createChannel
    queueName = prop.get("flink.rabbitmq.queue")
  }

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    declarationQueue(queueName)
    // 预取数
    channel.basicQos(1)

    while (running) {
      ctx.getCheckpointLock.synchronized {
        channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
          override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
            ctx.collect(new String(body,"UTF-8"))
            // 手动ACK
            channel.basicAck(envelope.getDeliveryTag, false)
          }
        })
      }
    }

  }

  override def cancel(): Unit = {
    running = false
    if (channel != null) {
      channel.close()
    }
    if(conn != null) {
      conn.close()
    }
  }


  /**
   * 加载ssl证书
   * @return SSLContext
   */
  def loadSSLCertificate(): SSLContext = {
    val keyPassphrase = prop.get("flink.rabbitmq.ssl.password").toCharArray
    val ks = KeyStore.getInstance("PKCS12")
    ks.load(new FileInputStream(prop.get("flink.rabbitmq.ssl.key")), keyPassphrase)
    val kmf = KeyManagerFactory.getInstance(prop.get("flink.rabbitmq.ssl.algorithm"))
    kmf.init(ks, keyPassphrase)

    val trustPassphrase = prop.get("flink.rabbitmq.ssl.password").toCharArray
    val tks = KeyStore.getInstance("JKS")
    tks.load(new FileInputStream(prop.get("flink.rabbitmq.ssl.store")), trustPassphrase)
    val tmf = TrustManagerFactory.getInstance(prop.get("flink.rabbitmq.ssl.algorithm"))
    tmf.init(tks)

    val sslContext = SSLContext.getInstance(prop.get("flink.rabbitmq.ssl.version"))
    sslContext.init(kmf.getKeyManagers, tmf.getTrustManagers, null)
    sslContext
  }

  /**
   * 声明队列
   * @param queueName
   */
  def declarationQueue(queueName:String): Unit = {
    channel.queueDeclare(queueName, true, false, false, null)
  }

}

RabbitMQSink

package com.study.rabbitmq.sink

import java.io.FileInputStream
import java.security.KeyStore

import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

class RabbitMQSink(prop: ParameterTool) extends RichSinkFunction[String] {

  private var conn: Connection = _
  private var channel: Channel = _
  private var queueName: String = _

  override def open(parameters: Configuration): Unit = {
    val factory = new ConnectionFactory
    // 设置IP
    factory.setHost(prop.get("flink.rabbitmq.host"))
    // 端口 RabbitMQ SSL 默认端口 5671
    factory.setPort(prop.get("flink.rabbitmq.port").toInt)
    // 账号密码
    factory.setUsername(prop.get("flink.rabbitmq.username"))
    factory.setPassword(prop.get("flink.rabbitmq.password"))
    // 虚拟主机
    factory.setVirtualHost(prop.get("flink.rabbitmq.virtualHost"))
    // SSL证书
    val sSLContext = loadSSLCertificate()
    factory.useSslProtocol(sSLContext)
    // 连接 通道 队列
    conn = factory.newConnection
    channel = conn.createChannel
    queueName = prop.get("flink.rabbitmq.queue")
  }

  override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
    declarationQueue(queueName)
    channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, value.getBytes("UTF-8"))
  }

  override def close(): Unit = {
    if (channel != null) {
      channel.close()
    }
    if(conn != null) {
      conn.close()
    }
  }

  /**
   * 加载ssl证书
   * @return SSLContext
   */
  def loadSSLCertificate(): SSLContext = {
    val keyPassphrase = prop.get("flink.rabbitmq.ssl.password").toCharArray
    val ks = KeyStore.getInstance("PKCS12")
    ks.load(new FileInputStream(prop.get("flink.rabbitmq.ssl.key")), keyPassphrase)
    val kmf = KeyManagerFactory.getInstance(prop.get("flink.rabbitmq.ssl.algorithm"))
    kmf.init(ks, keyPassphrase)

    val trustPassphrase = prop.get("flink.rabbitmq.ssl.password").toCharArray
    val tks = KeyStore.getInstance("JKS")
    tks.load(new FileInputStream(prop.get("flink.rabbitmq.ssl.store")), trustPassphrase)
    val tmf = TrustManagerFactory.getInstance(prop.get("flink.rabbitmq.ssl.algorithm"))
    tmf.init(tks)

    val sslContext = SSLContext.getInstance(prop.get("flink.rabbitmq.ssl.version"))
    sslContext.init(kmf.getKeyManagers, tmf.getTrustManagers, null)
    sslContext
  }

  /**
   * 声明队列
   * @param queueName
   */
  def declarationQueue(queueName:String): Unit = {
    channel.queueDeclare(queueName, true, false, false, null)
  }


}

配置文件

# IP 端口
flink.rabbitmq.host=192.168.116.100
flink.rabbitmq.port=5671
# 账号密码
flink.rabbitmq.username=admin
flink.rabbitmq.password=admin
# 虚拟主机
flink.rabbitmq.virtualHost=/
# 队列
flink.rabbitmq.queue=rabbitmq-java-test

# 证书密码
flink.rabbitmq.ssl.password=123456
# 证书路径
flink.rabbitmq.ssl.key=C:\\rabbitmq\\ssl\\client_key.p12
flink.rabbitmq.ssl.store=C:\\rabbitmq\\ssl\\rabbitStore
# 协议
flink.rabbitmq.ssl.version=TLSv1.3
# 算法
flink.rabbitmq.ssl.algorithm=SunX509

测试Sink

package com.study.rabbitmq


import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import com.study.rabbitmq.sink.RabbitMQSink
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._

object PublishRabbitMQ {

  def main(args: Array[String]): Unit = {

    val parameter = ParameterTool.fromArgs(args)
    val path = parameter.get("config", "/opt/rabbitmq/prop.properties")
    val prop = ParameterTool.fromPropertiesFile(path)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)


    env
      .addSource(new RichSourceFunction[String] {
        val pattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
        var running = true
        override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
          while (running) {
            val msg = "hello rabbit"
            println(LocalDateTime.now().format(pattern) + " => " + msg)

            ctx.collect(msg)
            Thread.sleep(3000L)
          }
        }
        override def cancel(): Unit = {
          running = false
        }
      })
      .addSink(new RabbitMQSink(prop))


    env.execute("test sink to rabbitmq")
  }
}

控制台输出:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2021-03-05 22:01:28 => hello rabbit
2021-03-05 22:01:31 => hello rabbit
2021-03-05 22:01:34 => hello rabbit
2021-03-05 22:01:37 => hello rabbit
2021-03-05 22:01:40 => hello rabbit
2021-03-05 22:01:43 => hello rabbit
2021-03-05 22:01:46 => hello rabbit
2021-03-05 22:01:49 => hello rabbit
2021-03-05 22:01:52 => hello rabbit

测试Source

package com.study.rabbitmq


import com.study.rabbitmq.source.RabbitMQSource
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object SubscribeRabbitMQ {
  def main(args: Array[String]): Unit = {
    val parameter = ParameterTool.fromArgs(args)
    val path = parameter.get("config", "/opt/rabbitmq/prop.properties")
    val prop = ParameterTool.fromPropertiesFile(path)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)

    env
      .addSource(new RabbitMQSource(prop))
      .print()


    env.execute("test source from rabbitmq")
  }
}

控制台输出:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2> hello rabbit
1> hello rabbit
2> hello rabbit
1> hello rabbit
2> hello rabbit
1> hello rabbit
2> hello rabbit
1> hello rabbit
2> hello rabbit