Kafka Connect Jdbc 单组件连接达梦
Kafka Connect Jdbc 单组件连接达梦
现在要尝试将kafka-connect-jdbc组件单独抽出来用,配置这个东西还是出了一点状况,下面我将完整的搭建以及出现的问题都记录下来。
1、通过Confluent提供地址下载最新JDBC Connector(失败)
首先,通过Confluent官方提供的JDBC Connector地址下载Connector。由于提供的下载格式的zip,所以需要在本地解压后将文件夹上传至服务器(通过Confluent-hub组件下载,可以直接下载文件夹)。
进入文件夹,内容如下:
[[email protected] confluentinc-kafka-connect-jdbc-10.0.2]# ls
assets doc etc lib manifest.json
与Confluent配置相同,开发者需要将DM的jar包放在lib中,并且在配置文件中指出依赖包位置。
[[email protected] lib]# ls
common-utils-6.0.0.jar mssql-jdbc-8.4.1.jre8.jar
oraclepki-19.7.0.0.jar postgresql-42.2.10.jar
ucp-19.7.0.0.jar DmJdbcDriver18.jar
ojdbc8-19.7.0.0.jar xmlparserv2-19.7.0.0.jar
orai18n-19.7.0.0.jar simplefan-19.7.0.0.jar
xdb-19.7.0.0.jar sqlite-jdbc-3.25.2.jar
jtds-1.3.1.jar ojdbc8-production-19.7.0.0.pom
osdt_cert-19.7.0.0.jar slf4j-api-1.7.30.jar
kafka-connect-jdbc-10.0.2.jar ons-19.7.0.0.jar
osdt_core-19.7.0.0.jar
接下来需要配置的文件包括两个,一个用于配置Connector相关的参数(source-quickstart-sqlite.properties),一个用于配置standalone模式相关的参数(connect-standalone.properties)。
source-quickstart-sqlite.properties配置
在kafka-connect-jdbc/etc下有source-quickstart-sqlite.properties文件,这就是Connector相关的配置文件,所以需要复制重新配置:
[[email protected] etc]# cp source-quickstart-sqlite.properties source-dm.properties
然后打开文件配置参数:
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=source-jdbc-dm
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:dm://IP:Port/jc?user=JC&password=JC_Test123&characterEncoding=utf-8
mode=incrementing
incrementing.column.name=id
topic.prefix=test-dm-jdbc-
# Define when identifiers should be quoted in DDL and DML statements.
# The default is 'always' to maintain backward compatibility with prior versions.
# Set this to 'never' to avoid quoting fully-qualified or simple table and column names.
#quote.sql.identifiers=always
connect-standalone.properties配置
接下来配置standalone模式的文件:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
启动
配置完了之后尝试启动,但是报错,无法连接到数据库:
[2020-12-05 18:02:24,551] INFO Unable to connect to database on attempt 2/3. Will retry in 10000 ms. (io.confluent.connect.jdbc.util.CachedConnectionProvider:89)
java.sql.SQLException: No suitable driver found for jdbc:dm://IP:Port/jc?user=JC&password=JC_Test123&characterEncoding=utf-8
错误说是没找到对应的driver。
[2020-12-05 18:02:07,789] INFO Loading plugin from: /root/apps/confluentinc-kafka-connect-jdbc-10.0.2/lib/DmJdbcDriver18.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:246)
查看启动时加载jar包的记录得到上面这条,这里DMJdbcDriver18.jar明明就加载进来了。
上网找类似的问题,其中博客 No Suitable Driver Found For Jdbc_我的解决方法 中提到:
今天出现编码出现了No suitable driver found for jdbc,又是找遍了网上的资料,基本上都说是三个问题:
一是:连接URL格式出现了问题(Connection conn=DriverManager.getConnection(“jdbc:mysql://localhost:3306/XX”,“root”,“XXXX”)
二是:驱动字符串出错(com.mysql.jdbc.Driver)
三是Classpath中没有加入合适的mysql_jdbc驱动
经过我的仔细检查,这三种错误我都没有犯,为什么呢?
尝试着将mysql-connector-java-3.1.14-bin.jar的jar包加入C:\Program Files\Java\jre1.6.0_02\lib\ext文件夹下,问题解决了!!
原来是不仅仅要求将驱动加入classpath中,而且需要将该jar包加入到java运行环境的外部jar包中
同样的,URL是从Confluent的配置文件中粘贴来的,应该没问题。驱动字符串出错,不理解是什么意思。没有加入jdbc驱动,实际上我添加了,并且检查出来了,应该也不是这个问题。博主的解决方案是将jar包添加到java运行环境。
2、解决no suitable driver问题连接成功并测试
经过多次尝试和配置还是没法解决,我下面有两个思路:
- 先试着连接mysql,从而了解是jar包问题还是配置文件问题(若成功连接mysql,那应该配置文件没问题)。
- 先使用Confluent连接达梦,这样也可以确定是配置文件问题还是达梦的问题。
我先confluent连接达梦把,在spark-02机中有配置好的,直接启动。经过测试,confluent的配置是没问题的,可以连接到达梦。
[2021-01-16 04:30:36,013] INFO Begin using SQL query: SELECT * FROM "JC"."linearTraining22" WHERE "JC"."linearTraining22"."id" > ? ORDER BY "JC"."linearTraining22"."id" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)
所以可以排除配置文件的问题,也不是Confluent当时下载组件的问题,jar包也是一样的,也可以排除jar包的问题。但是不确定是不是kafka-connect-jdbc组件更新导致的问题,所以接下来我觉得可以把confluent中组件拷贝出来测试。
我将当时通过confluent-hub下载的kafka-connect-jdbc组件给拷贝出来了,然后丢到spark-04号机,完全没有修改任何配置文件,确实可以连接达梦。
[2020-12-05 19:11:46,369] INFO Using JDBC dialect Generic (io.confluent.connect.jdbc.source.JdbcSourceTask:102)
[2020-12-05 19:11:46,647] INFO Attempting to open connection #1 to Generic (io.confluent.connect.jdbc.util.CachedConnectionProvider:82)
[2020-12-05 19:11:47,034] INFO Found offset {{protocol=1, table=JC.linearTraining22}={incrementing=731}, {table=linearTraining22}=null} for partition {protocol=1, table=JC.linearTraining22} (io.confluent.connect.jdbc.source.JdbcSourceTask:193)
[2020-12-05 19:11:47,036] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:261)
[2020-12-05 19:11:47,036] INFO WorkerSourceTask{id=test-source-dm-jdbc-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2020-12-05 19:11:47,090] INFO Begin using SQL query: SELECT * FROM "JC"."linearTraining22" WHERE "JC"."linearTraining22"."id" > ? ORDER BY "JC"."linearTraining22"."id" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)
所以说,通过Confluent官网下载的Kakfa-connect-jdbc组件可能会有一些问题,因为是最新版本更新很快,导致适配不好之类的。在实际使用的话可以考虑通过Confluet-hub组件下载kafka-connect-jdbc文件夹。
测试
虽然连接成功了,但是不确定能不能正常运行,所以接下来我通过一个生产者程序向达梦的表写入数据,通过kafka-console-consumer来消费记录。
首先,运行connetor:
[[email protected] bin]# sh connect-standalone.sh /root/apps/kafka_2.13-2.7.0/config/my_connect_standalone_dm.properties /root/apps/confluentinc-kafka-connect-jdbc/etc/source-dmjdbc-dm.properties
自动生成SQL即运行成功:
[2020-12-05 20:56:19,905] INFO Begin using SQL query: SELECT * FROM "JC"."linearTraining22" WHERE "JC"."linearTraining22"."id" > ? ORDER BY "JC"."linearTraining22"."id" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)
然后,运行消费者:
[[email protected] bin]# sh kafka-console-consumer.sh --topic test-dm-jc-linearTraining22 --bootstrap-server localhost:9092
接下来,运行生产者,没观察到什么记录,可能是这个topic偏移量乱了或者什么的,我试着新搞一个表再试。
新开了一个表,在控制台打印出来成功提交偏移量,但是kafka消费者却没能消费成功,也就是说kafka Connector是没问题的,只是消费者配错了?
[2020-12-05 21:29:19,966] INFO WorkerSourceTask{id=test-source-dm-jdbc-0} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:574)
经过几次尝试,最后还是新开了topic,最后消费成功了,效果如下:
接下来需要将完整的配置以及组件安装梳理一遍。
3、完整的安装步骤
经过尝试,使用Confluent官网提供的压缩包没能连接达梦数据库,不确定是不是版本更新的问题。总之最后通过Confluent-hub可以下载可用的组件。所以,整个组件的配置步骤包括:通过Confluent-hub下载connect-jdbc、配置Connector的文件、配置standalone的文件、启动测试。
3.1 通过Confluent-hub下载connect-jdbc
由于之前我在linux中安装了Confluent,而且也下载了connect-jdbc,所以我直接把整个文件夹复制就可以了。
confluentinc-kafka-connect-jdbc # Confluent-hub下载的组件名称
assets doc etc lib manifest.json # 内部的目录,Connector的配置文件在etc中
3.2 配置Connector文件
进入confluentinc-kafka-connect-jdbc/etc,配置source-dmjcbd-dm.properties文件:
name=test-source-dm-jdbc
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:dm://IP:Port/jc?user=JC&password=JC_Test123&characterEncoding=utf-8
table.whitelist=linearTraining31
mode=incrementing
incrementing.column.name=id
topic.prefix=test-
3.3 配置Standalone文件
Standalone文件在kafka/config路径下有样板,所以复制一份做修改即可,具体的配置如下:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/root/apps/confluentinc-kafka-connect-jdbc
3.4 启动测试
启动指令如下:
$ bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
具体的指令如下:
sh connect-standalone.sh /root/apps/kafka_2.13-2.7.0/config/my_connect_standalone_dm.properties /root/apps/confluentinc-kafka-connect-jdbc/etc/source-dmjdbc-dm.properties
在成功启动后控制台会输出SQL语句:
[2020-12-05 21:50:07,362] INFO Begin using SQL query: SELECT * FROM "JC"."linearTraining31" WHERE "JC"."linearTraining31"."id" > ? ORDER BY "JC"."linearTraining31"."id" ASC (io.confluent.connect.jdbc.source.TableQuerier:164)
最后开启消费者:
[[email protected] bin]# sh kafka-console-consumer.sh --topic test-linearTraining31 --bootstrap-server localhost:9092
可以消费到数据:
下一篇: Qt槽函数创建