欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Debezium MySQL源连接器

程序员文章站 2024-03-11 17:28:07
...

Debezium MySQL源连接器

Debezium的MySQL连接是源连接器,可以得到现有数据的快照,记录所有的MySQL服务器/群集在数据库中的行级变化。第一次连接到MySQL服务器时,它将读取所有数据库的一致快照。该快照完成后,连接器将连续读取提交给MySQL的更改,并生成相应的插入,更新和删除事件。每个表的所有事件都记录在单独的Kafka主题中,应用程序和服务可以轻松使用它们。

  • Confluent支持MySQL连接器0.9.3版和更高版本。
  • Confluent支持将此连接器与MySQL 5.6或更高版本一起使用。

安装MySQL连接器

您可以使用Confluent Hub客户端(推荐)安装此连接器,也可以手动下载ZIP文件。

confluent-hub install debezium/debezium-connector-mysql:latest

您可以通过替换latest版本号来安装特定版本。例如:

confluent-hub install debezium/debezium-connector-mysql:0.9.4

许可证

Debezium MySQL连接器是一个开源连接器,不需要Confluent企业许可证。

在MySQL服务器上启用二进制日志

必须将MySQL服务器配置为使用行级二进制日志,有关详细信息,请参见MySQL文档。MySQL的二进制日志或二进制日志以数据库执行的相同顺序记录所有操作,包括对表模式的更改或对表中存储的数据的更改。MySQL使用其binlog进行复制和恢复。

Debezium的MySQL连接器读取MySQL的二进制日志,以了解数据更改的内容和顺序。然后,它为binlog中的每个行级插入,更新和删除操作生成一个change事件,并在单独的Kafka主题中记录每个表的所有change事件。

这通常是在MySQL服务器配置文件中完成的,看起来类似于以下片段:

server-id         = 223344
log_bin           = mysql-bin
binlog_format     = row
binlog_row_image  = full
expire_logs_days  = 10

属性解释:

  • 对于MySQL集群中的每个服务器和复制客户端,server-id的值必须唯一。设置连接器时,还为连接器分配了唯一的服务器ID。
  • log_bin的值是二进制日志文件序列的基本名称。
  • binlog_format的值必须设置为row或ROW。
  • binlog_row_image的值必须设置为full或FULL。
  • expire_log_days的值是自动二进制日志文件删除的天数。默认值为0,表示“不自动删除”,因此请确保设置适合您的环境的值。

mysql8下弃用expire_log_days,替换为:binlog_expire_logs_seconds,默认值为30天,建议配置如下:

# ----------------------------------------------
# 自定义参数
# 设置日志清理周期为7天
binlog_expire_logs_seconds=604800
# 设置默认时区
default-time-zone='+8:00'
# ----------------------------------------------

快速开始

Debezium的MySQL连接器是一个源连接器,可以在单独的Kafka主题中记录每个表的事件,应用程序和服务可以轻松地使用它们。

安装连接器

如果要使用Docker映像来设置Kafka,ZooKeeper和Kafka Connect,请参考Debezium教程。对于以下教程,您需要在本地安装Confluent Platform。

导航到您的Confluent Platform安装目录,然后运行以下命令来安装连接器:

confluent-hub install debezium/debezium-connector-mysql:0.9.4

添加新的连接器插件需要重新启动Connect。使用Confluent CLI重新启动Connect。

提示

Confluent CLI开发命令的命令语法在5.3.0中已更改。这些命令已移至。例如,的语法为now 。有关更多信息,请参见confluent localconfluent local confluent start confluent local start

confluent local stop connect && confluent local start connect
Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]

检查MySQL插件是否已正确安装并由插件加载器加载:

curl -sS localhost:8083/connector-plugins | jq .[].class | grep mysql
"io.debezium.connector.mysql.MySqlConnector"

使用Docker设置MySQL(可选)

如果没有本机安装的MySQL,则可以使用以下命令来启动一个新容器,该容器运行预先配置了清单数据库的MySQL数据库服务器:

#Run docker container
docker run -it --rm --name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

启动一个MySQL命令行客户端。

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

启动Debezium SQL Server连接器

创建文件register-mysql.json以存储以下连接器配置:

{
 "name": "inventory-connector",
 "config": {
     "connector.class": "io.debezium.connector.mysql.MySqlConnector",
     "tasks.max": "1",
     "database.hostname": "mysql",
     "database.port": "3306",
     "database.user": "debezium",
     "database.password": "dbz",
     "database.server.id": "184054",
     "database.server.name": "dbserver1",
     "database.whitelist": "inventory",
     "database.history.kafka.bootstrap.servers": "localhost:9092",
     "database.history.kafka.topic": "schema-changes.inventory"
     }
 }

启动连接器。

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

开始您的Kafka消费者

在新的终端会话中启动使用者。

confluent local consume dbserver1.inventory.customers -- --from-beginning

当您在MySQL bash中输入SQL查询以添加或修改数据库中的记录时,将填充消息,并在您的用户终端上显示这些消息,其中显示已添加或已修改的记录。

# Explore the sample inventory database already populated in your MySQL client running in Docker
use inventory;
SELECT * FROM customers;
# Type these queries to see change events in the consumer terminal
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
DELETE FROM customers WHERE id=1004;

清理资源

删除连接器并停止Confluent服务。

curl -X DELETE localhost:8083/connectors/inventory-connector
confluent local stop

停止SQL Server容器。

docker stop mysqlterm mysql

注意

此处提供的部分信息来自Debezium社区最初提供的文档。Debezium生产的作品已获得知识共享3.0的许可。

dbz-connecor-mysql与jdbc-connector-mysql对比

  • 1、dbz-connector 支持数据插入、更新、删除的CDC,但jdbc-connector 仅支持插入、更新;
  • 2、jdbc-connect提供了mysql的源端、池端连接器,但是dbz仅提供为源端连接器,池端连接器,需要自行实现。
  • 3、dbz-connector-mysql 与 jdbc-connect的消息格式不兼容;

原文

https://docs.confluent.io/current/connect/debezium-connect-mysql/index.html