欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka Connect Jdbc 单组件连接达梦

程序员文章站 2024-03-11 16:35:55
...

Kafka Connect Jdbc 单组件连接达梦

现在要尝试将kafka-connect-jdbc组件单独抽出来用,配置这个东西还是出了一点状况,下面我将完整的搭建以及出现的问题都记录下来。

1、通过Confluent提供地址下载最新JDBC Connector(失败)

首先,通过Confluent官方提供的JDBC Connector地址下载Connector。由于提供的下载格式的zip,所以需要在本地解压后将文件夹上传至服务器(通过Confluent-hub组件下载,可以直接下载文件夹)。

进入文件夹,内容如下:

[[email protected] confluentinc-kafka-connect-jdbc-10.0.2]# ls
assets  doc  etc  lib  manifest.json

与Confluent配置相同,开发者需要将DM的jar包放在lib中,并且在配置文件中指出依赖包位置。

[[email protected] lib]# ls
common-utils-6.0.0.jar     mssql-jdbc-8.4.1.jre8.jar       
oraclepki-19.7.0.0.jar     postgresql-42.2.10.jar  
ucp-19.7.0.0.jar           DmJdbcDriver18.jar             
ojdbc8-19.7.0.0.jar        xmlparserv2-19.7.0.0.jar      
orai18n-19.7.0.0.jar       simplefan-19.7.0.0.jar  
xdb-19.7.0.0.jar           sqlite-jdbc-3.25.2.jar
jtds-1.3.1.jar             ojdbc8-production-19.7.0.0.pom  
osdt_cert-19.7.0.0.jar     slf4j-api-1.7.30.jar    
kafka-connect-jdbc-10.0.2.jar  ons-19.7.0.0.jar                
osdt_core-19.7.0.0.jar  

接下来需要配置的文件包括两个,一个用于配置Connector相关的参数(source-quickstart-sqlite.properties),一个用于配置standalone模式相关的参数(connect-standalone.properties)。

source-quickstart-sqlite.properties配置

在kafka-connect-jdbc/etc下有source-quickstart-sqlite.properties文件,这就是Connector相关的配置文件,所以需要复制重新配置:

[[email protected] etc]# cp source-quickstart-sqlite.properties source-dm.properties

然后打开文件配置参数:

#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License.  You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations under the License.
#

# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=source-jdbc-dm
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:dm://IP:Port/jc?user=JC&password=JC_Test123&characterEncoding=utf-8
mode=incrementing
incrementing.column.name=id
topic.prefix=test-dm-jdbc-

# Define when identifiers should be quoted in DDL and DML statements.
# The default is 'always' to maintain backward compatibility with prior versions.
# Set this to 'never' to avoid quoting fully-qualified or simple table and column names.
#quote.sql.identifiers=always

connect-standalone.properties配置

接下来配置standalone模式的文件:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

启动

配置完了之后尝试启动,但是报错,无法连接到数据库:

[2020-12-05 18:02:24,551] INFO Unable to connect to database on attempt 2/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:89)
java.sql.SQLException: No suitable driver found for jdbc:dm://IP:Port/jc?user=JC&password=JC_Test123&characterEncoding=utf-8

错误说是没找到对应的driver。

[2020-12-05 18:02:07,789] INFO Loading plugin from: /root/apps/confluentinc-kafka-connect-jdbc-10.0.2/lib/DmJdbcDriver18.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)

查看启动时加载jar包的记录得到上面这条,这里DMJdbcDriver18.jar明明就加载进来了。

上网找类似的问题,其中博客 No Suitable Driver Found For Jdbc_我的解决方法 中提到:

今天出现编码出现了No suitable driver found for jdbc,又是找遍了网上的资料,基本上都说是三个问题:
一是:连接URL格式出现了问题(Connection conn=DriverManager.getConnection(“jdbc:mysql://localhost:3306/XX”,“root”,“XXXX”)
二是:驱动字符串出错(com.mysql.jdbc.Driver)
三是Classpath中没有加入合适的mysql_jdbc驱动
经过我的仔细检查,这三种错误我都没有犯,为什么呢?
尝试着将mysql-connector-java-3.1.14-bin.jar的jar包加入C:\Program Files\Java\jre1.6.0_02\lib\ext文件夹下,问题解决了!!
原来是不仅仅要求将驱动加入classpath中,而且需要将该jar包加入到java运行环境的外部jar包中

同样的,URL是从Confluent的配置文件中粘贴来的,应该没问题。驱动字符串出错,不理解是什么意思。没有加入jdbc驱动,实际上我添加了,并且检查出来了,应该也不是这个问题。博主的解决方案是将jar包添加到java运行环境。

2、解决no suitable driver问题连接成功并测试

经过多次尝试和配置还是没法解决,我下面有两个思路:

  • 先试着连接mysql,从而了解是jar包问题还是配置文件问题(若成功连接mysql,那应该配置文件没问题)。
  • 先使用Confluent连接达梦,这样也可以确定是配置文件问题还是达梦的问题。

我先confluent连接达梦把,在spark-02机中有配置好的,直接启动。经过测试,confluent的配置是没问题的,可以连接到达梦。

[2021-01-16 04:30:36,013] INFO Begin using SQL query: SELECT * FROM "JC"."linearTraining22" WHERE "JC"."linearTraining22"."id" > ? ORDER BY "JC"."linearTraining22"."id" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)

所以可以排除配置文件的问题,也不是Confluent当时下载组件的问题,jar包也是一样的,也可以排除jar包的问题。但是不确定是不是kafka-connect-jdbc组件更新导致的问题,所以接下来我觉得可以把confluent中组件拷贝出来测试。

我将当时通过confluent-hub下载的kafka-connect-jdbc组件给拷贝出来了,然后丢到spark-04号机,完全没有修改任何配置文件,确实可以连接达梦。

[2020-12-05 19:11:46,369] INFO Using JDBC dialect Generic (io.confluent.connect.jdbc.source.JdbcSourceTask:102)
[2020-12-05 19:11:46,647] INFO Attempting to open connection #1 to Generic (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)
[2020-12-05 19:11:47,034] INFO Found offset {{protocol=1, table=JC.linearTraining22}={incrementing=731}, {table=linearTraining22}=null} for partition {protocol=1, table=JC.linearTraining22} (io.confluent.connect.jdbc.source.JdbcSourceTask:193)
[2020-12-05 19:11:47,036] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:261)
[2020-12-05 19:11:47,036] INFO WorkerSourceTask{id=test-source-dm-jdbc-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2020-12-05 19:11:47,090] INFO Begin using SQL query: SELECT * FROM "JC"."linearTraining22" WHERE "JC"."linearTraining22"."id" > ? ORDER BY "JC"."linearTraining22"."id" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)

所以说,通过Confluent官网下载的Kakfa-connect-jdbc组件可能会有一些问题,因为是最新版本更新很快,导致适配不好之类的。在实际使用的话可以考虑通过Confluet-hub组件下载kafka-connect-jdbc文件夹。

测试

虽然连接成功了,但是不确定能不能正常运行,所以接下来我通过一个生产者程序向达梦的表写入数据,通过kafka-console-consumer来消费记录。

首先,运行connetor:

[[email protected] bin]# sh connect-standalone.sh /root/apps/kafka_2.13-2.7.0/config/my_connect_standalone_dm.properties /root/apps/confluentinc-kafka-connect-jdbc/etc/source-dmjdbc-dm.properties 

自动生成SQL即运行成功:

[2020-12-05 20:56:19,905] INFO Begin using SQL query: SELECT * FROM "JC"."linearTraining22" WHERE "JC"."linearTraining22"."id" > ? ORDER BY "JC"."linearTraining22"."id" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)

然后,运行消费者:

[[email protected] bin]# sh kafka-console-consumer.sh --topic test-dm-jc-linearTraining22 --bootstrap-server localhost:9092

接下来,运行生产者,没观察到什么记录,可能是这个topic偏移量乱了或者什么的,我试着新搞一个表再试。

新开了一个表,在控制台打印出来成功提交偏移量,但是kafka消费者却没能消费成功,也就是说kafka Connector是没问题的,只是消费者配错了?

[2020-12-05 21:29:19,966] INFO WorkerSourceTask{id=test-source-dm-jdbc-0} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:574)

经过几次尝试,最后还是新开了topic,最后消费成功了,效果如下:

Kafka Connect Jdbc 单组件连接达梦

接下来需要将完整的配置以及组件安装梳理一遍。

3、完整的安装步骤

经过尝试,使用Confluent官网提供的压缩包没能连接达梦数据库,不确定是不是版本更新的问题。总之最后通过Confluent-hub可以下载可用的组件。所以,整个组件的配置步骤包括:通过Confluent-hub下载connect-jdbc、配置Connector的文件、配置standalone的文件、启动测试。

3.1 通过Confluent-hub下载connect-jdbc

由于之前我在linux中安装了Confluent,而且也下载了connect-jdbc,所以我直接把整个文件夹复制就可以了。

confluentinc-kafka-connect-jdbc # Confluent-hub下载的组件名称

assets  doc  etc  lib  manifest.json # 内部的目录,Connector的配置文件在etc中

3.2 配置Connector文件

进入confluentinc-kafka-connect-jdbc/etc,配置source-dmjcbd-dm.properties文件:

name=test-source-dm-jdbc
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:dm://IP:Port/jc?user=JC&password=JC_Test123&characterEncoding=utf-8
table.whitelist=linearTraining31
mode=incrementing
incrementing.column.name=id
topic.prefix=test-

3.3 配置Standalone文件

Standalone文件在kafka/config路径下有样板,所以复制一份做修改即可,具体的配置如下:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/root/apps/confluentinc-kafka-connect-jdbc

3.4 启动测试

启动指令如下:

$ bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

具体的指令如下:

sh connect-standalone.sh /root/apps/kafka_2.13-2.7.0/config/my_connect_standalone_dm.properties /root/apps/confluentinc-kafka-connect-jdbc/etc/source-dmjdbc-dm.properties

在成功启动后控制台会输出SQL语句:

[2020-12-05 21:50:07,362] INFO Begin using SQL query: SELECT * FROM "JC"."linearTraining31" WHERE "JC"."linearTraining31"."id" > ? ORDER BY "JC"."linearTraining31"."id" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)

最后开启消费者:

[[email protected] bin]# sh kafka-console-consumer.sh --topic test-linearTraining31 --bootstrap-server localhost:9092

可以消费到数据:

Kafka Connect Jdbc 单组件连接达梦

相关标签: kafka kafka connect