Flink - RabbitMQ 自定义Source/Sink
程序员文章站
2022-07-14 14:15:57
...
Flink - RabbitMQ 自定义Source/Sink
RabbitMQ开启SSL安全认证后,flink-connector-rabbitmq 无法连接:
Caused by: java.lang.RuntimeException: Cannot create RMQ connection with rabbitmq-java-test at 192.168.116.100
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(RMQSource.java:172)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection timed out: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
at java.net.Socket.connect(Socket.java:606)
at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:50)
at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:61)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:99)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:918)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:877)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:835)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:987)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.open(RMQSource.java:149)
... 10 more
Process finished with exit code 1
引入 amqp-client 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.study</groupId>
<artifactId>flink-rabbitmq</artifactId>
<version>1.0.0</version>
<name>flink-rabbitmq</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<flink.version>1.10.1</flink.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-scala-streaming -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- rabbitMQ -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<!--fastJson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build></build>
</project>
RabbitMQSource
package com.study.rabbitmq.source
import java.io.FileInputStream
import java.security.KeyStore
import com.rabbitmq.client.{AMQP, Channel, Connection, ConnectionFactory, DefaultConsumer, Envelope}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
class RabbitMQSource(prop: ParameterTool) extends RichSourceFunction[String]{
private var conn: Connection = _
private var channel: Channel = _
private var queueName: String = _
private var running: Boolean = true
override def open(parameters: Configuration): Unit = {
val factory = new ConnectionFactory
// 设置IP
factory.setHost(prop.get("flink.rabbitmq.host"))
// 端口 RabbitMQ SSL 默认端口 5671
factory.setPort(prop.get("flink.rabbitmq.port").toInt)
// 账号密码
factory.setUsername(prop.get("flink.rabbitmq.username"))
factory.setPassword(prop.get("flink.rabbitmq.password"))
// 虚拟主机
factory.setVirtualHost(prop.get("flink.rabbitmq.virtualHost"))
// SSL证书
val sSLContext = loadSSLCertificate()
factory.useSslProtocol(sSLContext)
// 连接 通道 队列
conn = factory.newConnection
channel = conn.createChannel
queueName = prop.get("flink.rabbitmq.queue")
}
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
declarationQueue(queueName)
// 预取数
channel.basicQos(1)
while (running) {
ctx.getCheckpointLock.synchronized {
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]): Unit = {
ctx.collect(new String(body,"UTF-8"))
// 手动ACK
channel.basicAck(envelope.getDeliveryTag, false)
}
})
}
}
}
override def cancel(): Unit = {
running = false
if (channel != null) {
channel.close()
}
if(conn != null) {
conn.close()
}
}
/**
* 加载ssl证书
* @return SSLContext
*/
def loadSSLCertificate(): SSLContext = {
val keyPassphrase = prop.get("flink.rabbitmq.ssl.password").toCharArray
val ks = KeyStore.getInstance("PKCS12")
ks.load(new FileInputStream(prop.get("flink.rabbitmq.ssl.key")), keyPassphrase)
val kmf = KeyManagerFactory.getInstance(prop.get("flink.rabbitmq.ssl.algorithm"))
kmf.init(ks, keyPassphrase)
val trustPassphrase = prop.get("flink.rabbitmq.ssl.password").toCharArray
val tks = KeyStore.getInstance("JKS")
tks.load(new FileInputStream(prop.get("flink.rabbitmq.ssl.store")), trustPassphrase)
val tmf = TrustManagerFactory.getInstance(prop.get("flink.rabbitmq.ssl.algorithm"))
tmf.init(tks)
val sslContext = SSLContext.getInstance(prop.get("flink.rabbitmq.ssl.version"))
sslContext.init(kmf.getKeyManagers, tmf.getTrustManagers, null)
sslContext
}
/**
* 声明队列
* @param queueName
*/
def declarationQueue(queueName:String): Unit = {
channel.queueDeclare(queueName, true, false, false, null)
}
}
RabbitMQSink
package com.study.rabbitmq.sink
import java.io.FileInputStream
import java.security.KeyStore
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
class RabbitMQSink(prop: ParameterTool) extends RichSinkFunction[String] {
private var conn: Connection = _
private var channel: Channel = _
private var queueName: String = _
override def open(parameters: Configuration): Unit = {
val factory = new ConnectionFactory
// 设置IP
factory.setHost(prop.get("flink.rabbitmq.host"))
// 端口 RabbitMQ SSL 默认端口 5671
factory.setPort(prop.get("flink.rabbitmq.port").toInt)
// 账号密码
factory.setUsername(prop.get("flink.rabbitmq.username"))
factory.setPassword(prop.get("flink.rabbitmq.password"))
// 虚拟主机
factory.setVirtualHost(prop.get("flink.rabbitmq.virtualHost"))
// SSL证书
val sSLContext = loadSSLCertificate()
factory.useSslProtocol(sSLContext)
// 连接 通道 队列
conn = factory.newConnection
channel = conn.createChannel
queueName = prop.get("flink.rabbitmq.queue")
}
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
declarationQueue(queueName)
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, value.getBytes("UTF-8"))
}
override def close(): Unit = {
if (channel != null) {
channel.close()
}
if(conn != null) {
conn.close()
}
}
/**
* 加载ssl证书
* @return SSLContext
*/
def loadSSLCertificate(): SSLContext = {
val keyPassphrase = prop.get("flink.rabbitmq.ssl.password").toCharArray
val ks = KeyStore.getInstance("PKCS12")
ks.load(new FileInputStream(prop.get("flink.rabbitmq.ssl.key")), keyPassphrase)
val kmf = KeyManagerFactory.getInstance(prop.get("flink.rabbitmq.ssl.algorithm"))
kmf.init(ks, keyPassphrase)
val trustPassphrase = prop.get("flink.rabbitmq.ssl.password").toCharArray
val tks = KeyStore.getInstance("JKS")
tks.load(new FileInputStream(prop.get("flink.rabbitmq.ssl.store")), trustPassphrase)
val tmf = TrustManagerFactory.getInstance(prop.get("flink.rabbitmq.ssl.algorithm"))
tmf.init(tks)
val sslContext = SSLContext.getInstance(prop.get("flink.rabbitmq.ssl.version"))
sslContext.init(kmf.getKeyManagers, tmf.getTrustManagers, null)
sslContext
}
/**
* 声明队列
* @param queueName
*/
def declarationQueue(queueName:String): Unit = {
channel.queueDeclare(queueName, true, false, false, null)
}
}
配置文件
# IP 端口
flink.rabbitmq.host=192.168.116.100
flink.rabbitmq.port=5671
# 账号密码
flink.rabbitmq.username=admin
flink.rabbitmq.password=admin
# 虚拟主机
flink.rabbitmq.virtualHost=/
# 队列
flink.rabbitmq.queue=rabbitmq-java-test
# 证书密码
flink.rabbitmq.ssl.password=123456
# 证书路径
flink.rabbitmq.ssl.key=C:\\rabbitmq\\ssl\\client_key.p12
flink.rabbitmq.ssl.store=C:\\rabbitmq\\ssl\\rabbitStore
# 协议
flink.rabbitmq.ssl.version=TLSv1.3
# 算法
flink.rabbitmq.ssl.algorithm=SunX509
测试Sink
package com.study.rabbitmq
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import com.study.rabbitmq.sink.RabbitMQSink
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
object PublishRabbitMQ {
def main(args: Array[String]): Unit = {
val parameter = ParameterTool.fromArgs(args)
val path = parameter.get("config", "/opt/rabbitmq/prop.properties")
val prop = ParameterTool.fromPropertiesFile(path)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
env
.addSource(new RichSourceFunction[String] {
val pattern = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
var running = true
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while (running) {
val msg = "hello rabbit"
println(LocalDateTime.now().format(pattern) + " => " + msg)
ctx.collect(msg)
Thread.sleep(3000L)
}
}
override def cancel(): Unit = {
running = false
}
})
.addSink(new RabbitMQSink(prop))
env.execute("test sink to rabbitmq")
}
}
控制台输出:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2021-03-05 22:01:28 => hello rabbit
2021-03-05 22:01:31 => hello rabbit
2021-03-05 22:01:34 => hello rabbit
2021-03-05 22:01:37 => hello rabbit
2021-03-05 22:01:40 => hello rabbit
2021-03-05 22:01:43 => hello rabbit
2021-03-05 22:01:46 => hello rabbit
2021-03-05 22:01:49 => hello rabbit
2021-03-05 22:01:52 => hello rabbit
测试Source
package com.study.rabbitmq
import com.study.rabbitmq.source.RabbitMQSource
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object SubscribeRabbitMQ {
def main(args: Array[String]): Unit = {
val parameter = ParameterTool.fromArgs(args)
val path = parameter.get("config", "/opt/rabbitmq/prop.properties")
val prop = ParameterTool.fromPropertiesFile(path)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
env
.addSource(new RabbitMQSource(prop))
.print()
env.execute("test source from rabbitmq")
}
}
控制台输出:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2> hello rabbit
1> hello rabbit
2> hello rabbit
1> hello rabbit
2> hello rabbit
1> hello rabbit
2> hello rabbit
1> hello rabbit
2> hello rabbit