欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Confluent 5.0 连接JDBC遇到的问题

程序员文章站 2024-03-11 17:02:55
...

Confluent 5.0 连接JDBC遇到的问题

大家好,我是一拳背A柱大弯的男人

这个礼拜直接原地爆炸,本周需要搭建Confluent平台并且通过JDBC连接数据源。我选择连接MySQL进行测试,但是在配置过程中遇到了问题,接下来我给大家讲一下我遇到的问题。

接下来我将详细汇报一下目前遇到的问题。

经过查询 Confluent 的文档我找到 Getting Started with Kafka Connect 这份用户手册,里面介绍了 Kafka Connect 能够连接多种数据源,并且可以在单机和集群模式下工作。

在单机模式下,只需要配置 worker.properties 和 connector.properties 两个配置文件即可启动:

bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]

其中 worker.properties 文件用于配置kafka集群、序列化格式等配置;另一份文件 connector.properties 是连接器的配置文件,即配置连接不同数据源所需要的参数。

下面是我的两份配置文件:

connect-avro-standalone_my.properties
# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=spark-02:9092,spark-03:9092,spark-04:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of
source-kafka-jdbc-mysql.properties

这个文件中我配置了MySQL相关的内容。

# name 作为唯一标识
name=source-kafka-jdbc-mysql
# connector连接器配置,要求必须是org.apache.kafka.connect.connector.Connector的子类
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
# connector最大的任务数
tasks.max=1
# 需要查询的表
table.whitelist=t_sparksql1
# 下面是MySQL相关的配置连接
connection.url=jdbc:mysql://1.19.5.2:3306/sparkdb
connection.user=otoo
connection.password=osxw
# 查询的模式使用时间戳模式
mode=timestamp
# topic会根据表名自动生成,prefix是topic的前缀
topic.prefix=test-kafka-jdbc-mysql-
# 本次启动相关的jar包路径
plugin.path=/root/apps/confluent-5.0.0/share/java/kafka-connect-jdbc-my
plugin.path下的相关jar包
common-utils-6.0.0.jar         sqlite-jdbc-3.25.2.jar
oraclepki-19.7.0.0.jar  	   postgresql-42.2.10.jar  
ucp-19.7.0.0.jar               xdb-19.7.0.0.jar
jtds-1.3.1.jar                 ojdbc8-19.7.0.0.jar                  
orai18n-19.7.0.0.jar           simplefan-19.7.0.0.jar  
kafka-connect-jdbc-10.0.1.jar  ojdbc8-production-19.7.0.0.pom       
osdt_cert-19.7.0.0.jar         slf4j-api-1.7.30.jar    
xmlparserv2-19.7.0.0.jar       mssql-jdbc-8.4.1.jre8.jar      
ons-19.7.0.0.jar               osdt_core-19.7.0.0.jar  

这些jar包是在Confluent的Install the connector manually里找到的,我分别尝试了10.0、5.5、5.0三个版本,以上是10.0版本的相关jar包内容。

完成上述两个文件的配置后,启动服务:

./connect-standalone /root/apps/confluent-5.0.0/etc/schema-registry/connect-avro-standalone_my.properties /root/apps/confluent-5.0.0/etc/kafka-connect-jdbc/source-kafka-jdbc-mysql.properties

此时控制台将输出大量日志,在这些日志中报错的地方是下面这一段:

[2020-12-01 08:55:50,052] INFO Unable to connect to database on attempt 1/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:89)
java.sql.SQLException: No suitable driver found for jdbc:mysql://10.129.225.254:3306/sparkdb
	at java.sql.DriverManager.getConnection(DriverManager.java:689)
	at java.sql.DriverManager.getConnection(DriverManager.java:208)
	at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:227)
	at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:83)
	at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)
	at io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.java:92)
	at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
	at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
	at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
	at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:241)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:298)
	at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:204)
	at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
[2020-12-01 08:56:00,054] INFO Attempting to open connection #2 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)

报错提示找不到合适的jdbc driver。

我在网上查资料,网友都说报这个错误应该是少了 mysql-connector-java 的jar包,于是我在MySQL的官方网站上找到了MySQL 5.7版本可用的jdbc版本:

Connector/J version JDBC version MySQL Server version JRE Required JDK Required for Compilation Status
8.0 4.2 1 5.6, 5.7, 8.0 JRE 8 or higher JDK 8.0 or higher3 General availability. Recommended version.
5.1 3.0, 4.0, 4.1, 4.2 5.62, 5.72, 8.02 JRE 5 or higher2 JDK 5.0 AND JDK 8.0 or higher3 General availability

​ 表格来源:Chapter 2 Connector/J Versions, and the MySQL and Java Versions They Require

经过两轮替换尝试还是报找不到driver这个错误。同时我在选择Kafka Connect JDBC的jar包集合的时候尝试换了10.0、5.50、5.0三个版本并且配合两个不同版本的connector尝试过,都没能正常启动服务。

总结

Confluent Platform是一个大集成平台,通过他可以配置很多服务,而目前我需要使用的有zookeeper和kafka。通过Confluent连接数据源有两种模式分别是Standalone和集群模式。通过Confluent官网的docs可以找到相关的配置信息,但是这次报的错误实在是不知道从何入手。接下来我打算还原Confluent平台重新配置kafka-connect-jdbc整个过程。