欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

程序员文章站 2022-06-19 15:22:41
IntroductionCanal是基于MySQL二进制日志的高性能数据同步系统 属于阿里巴巴的一个开源项目,基于java实现,整体已经在很多大型的互联网项目生产环境中使用,包括阿里、美团等都有广泛的应用,是一个非常成熟的数据库同步方案,基础的使用只需要进行简单的配置即可,以提供可靠的低延迟增量数据管道。官方文档:https://github.com/alibaba/canal/wiki项目地址:https://github.com/alibaba/canal下载地址:https://githu...

Introduction

Canal是基于MySQL二进制日志的高性能数据同步系统 属于阿里巴巴的一个开源项目,基于java实现,整体已经在很多大型的互联网项目生产环境中使用,包括阿里、美团等都有广泛的应用,是一个非常成熟的数据库同步方案,基础的使用只需要进行简单的配置即可,以提供可靠的低延迟增量数据管道。

官方文档:https://github.com/alibaba/canal/wiki
项目地址:https://github.com/alibaba/canal
下载地址:https://github.com/alibaba/canal/releases

Canal的特点

  1. 支持所有平台。
  2. 支持由Prometheus支持的细粒度系统监视。
  3. 支持通过不同方式(例如通过GTID)解析和订阅MySQL Binlog。
  4. 支持高性能,实时数据同步。(查看更多的性能)
  5. Canal Server和Canal Client均支持由Apache ZooKeeper支持的HA /可伸缩性
  6. Docker支持。

Canal实现原理

Canal将MySQL数据同步至Elasticsearch是基于MySQL的主从复制原理实现。
1.MySQL主从复制原理

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

  1. 主服务器将增删改操作记录到 binlog 中,这些记录称为binlog事件,可以通过来查看show binary events
  2. 从服务器通过 I/O 线程与主服务器的binlog日志建立连接后进行读取,读取的binlog日志写入本地 Relay log中级日志。
  3. 从服务器在通过本身的 SQL 线程读取 Relay log 后在本机执行主服务器的操作记录。

2.Canal实现原理

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

  1. Canal通过模拟成MySQL Slave的交互协议,表面上伪装成MySQL Slave,向MySQL Master发送dump协议。
  2. MySQL Master收到dump请求,开始推送 binlog 给 Slave即Canal。
  3. Canal解析 binlog 对象(原始为byte流)

3.C/S架构
Canal为C/S架构,分为Server端和Client端

  1. Server的包名为canal-deployer,server端部署好以后,可以直接监听mysql binlog,因为server端是把自己模拟成了mysql slave,所以,只能接受数据,没有进行任何逻辑的处理,具体的逻辑处理,需要client端进行处理。
  2. Client的包名为canal-adapter,client可以自动开发或者使用官方提供的canal-adapter,Adapter是可以将canal server端获取的数据转换成几个常用的中间件数据源,现在支持kafka、rocketmq、hbase、elasticsearch,针对这几个中间件的支持,直接配置即可,无需开发。

4.Canal Admin
从Canal1.1.4版本以后,阿里巴巴推出了Canal的WEB管理UI,版本必须要>=Canal1.1.4以上才可以部署,canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

配置环境

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

部署MySQL8.0.18

1.MySQL8.0.18部署
详见:MySQL8.0.18部署

2.开启binlog日志
注意
必须在/etc/my.cnf文件中开启以下选型

 
  1. [mysqld]
  2. log-bin=/usr/local/mysql/binlogs/mysql-bin #开启 binlog
  3. binlog-format=ROW #选择 ROW 模式,必须为ROW行模式
  4. server_id=1 #配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

3.创建授权

 
  1. mysql> create user canal identified by 'Canal@2019!'; #创建canal账户
  2. Query OK, 0 rows affected (0.08 sec)
  3.  
  4. mysql> grant select,replication slave,replication client on *.* to 'canal'@'%'; #授权canal账户查询和复制权限
  5. Query OK, 0 rows affected (0.02 sec)
  6.  
  7. mysql> flush privileges; #刷新授权
  8. Query OK, 0 rows affected (0.00 sec)

4.查看binlog是否正确启动

 
  1. mysql> show variables like 'binlog_format%'; #查看binlog模式
  2. +---------------+-------+
  3. | Variable_name | Value |
  4. +---------------+-------+
  5. | binlog_format | ROW |
  6. +---------------+-------+
  7. 1 row in set (0.06 sec)

5.创建需要同步的数据库

 
  1. mysql> create database canal_tsdb character set utf8;
  2. Query OK, 1 row affected, 1 warning (0.04 sec)
  3.  
  4. mysql> use canal_tsdb;
  5. Database changed
  6.  
  7. mysql> CREATE TABLE canal_table ( #创建canal_table表,字段为 id age name address
  8. -> id int(11) NOT NULL,
  9. -> age int(11) NOT NULL,
  10. -> name varchar(200) NOT NULL,
  11. -> address varchar(1000) DEFAULT NULL,
  12. -> PRIMARY KEY (id)
  13. -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  14. Query OK, 0 rows affected, 2 warnings (0.30 sec)
  15.  
  16. mysql> INSERT INTO canal_tsdb.canal_table(id, age, name, address) VALUES (1, 24, '徐伟亮', '上海市浦东新区');
  17. Query OK, 1 row affected (0.02 sec)
  18.  
  19. mysql> select * from canal_table;
  20. +----+-----+-----------+-----------------------+
  21. | id | age | name | address |
  22. +----+-----+-----------+-----------------------+
  23. | 1 | 24 | 徐伟亮 | 上海市浦东新区 |
  24. +----+-----+-----------+-----------------------+
  25. 1 row in set (0.01 sec)

部署Elasticsearch

以下两种方式根据需求选择其一即可
集群部署详见:Elasticsearch7.4集群部署并进行安全认证
单点部署详见:Elasticsearch7.4单点部署并进行安全认证

部署Canal-deployer服务端

1.下载并解压

 
  1. wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz
  2. mkdir /usr/local/canal-deployer
  3. tar xf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal-deployer/

2.修改配置文件

 
  1. vim /usr/local/canal-deployer/conf/example/instance.properties
  2. canal.instance.mysql.slaveId=3 #修改ID,不能和MySQL数据库一致
  3. canal.instance.gtidon=false
  4. canal.instance.master.address=192.168.31.216:8809 #指定mysql数据库地址及端口
  5. canal.instance.master.journal.name=
  6. canal.instance.master.position=
  7. canal.instance.master.timestamp=
  8. canal.instance.master.gtid=
  9. canal.instance.rds.accesskey=
  10. canal.instance.rds.secretkey=
  11. canal.instance.rds.instanceId=
  12. canal.instance.tsdb.enable=true
  13. canal.instance.dbUsername=canal #指定复制账号
  14. canal.instance.dbPassword=Canal@2019! #指定复制密码
  15. canal.instance.connectionCharset = UTF-8 #字符集格式,需要与mysql保持一致
  16. canal.instance.enableDruid=false
  17. canal.instance.filter.regex=.*\\..* #表名监控的正则
  18. canal.instance.filter.black.regex=
  19. canal.mq.topic=example
  20. canal.mq.partition=0

3.启动canal-deployer
因为canal-depaloyer由java开发,所以需要jdk环境,jdk版本需要大于1.5

 
  1. yum install java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-devel.x86_64 -y
  2. /usr/local/canal-deployer/bin/startup.sh

4.查看日志及端口
1)查看 server 日志

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

canal-deployer默认监听三个端口,111101111111112
11110:为admin管理端口
11111:为canal deployer 服务器占用的端口
11112:为指标下拉端口

2)查看 instance 的日志

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

部署Canal-adapter客户端

1.下载并解压

 
  1. wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.adapter-1.1.5-SNAPSHOT.tar.gz
  2. mkdir /usr/local/canal-adapter
  3. tar xf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal-adapter/

2.添加mysql8.0.18连接器
默认canal-adapter的lib中只有mysql5.x版本的连接器

 
  1. wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.18/mysql-connector-java-8.0.18.jar
  2. mv mysql-connector-java-8.0.18.jar /usr/local/canal-adapter/lib/
  3. chmod 777 /usr/local/canal-adapter/lib/mysql-connector-java-8.0.18.jar #权限修改与其它lib库一致
  4. chmod +st /usr/local/canal-adapter/lib/mysql-connector-java-8.0.18.jar

3.修改application.yml

 
  1. vim /usr/local/canal-adapter/conf/application.yml
  2. server:
  3. port: 8081 #指定canal-adapter监听端口
  4. spring:
  5. jackson:
  6. date-format: yyyy-MM-dd HH:mm:ss #时间格式
  7. time-zone: GMT+8 #时区
  8. default-property-inclusion: non_null
  9. canal.conf: #canal配置
  10. canalServerHost: 192.168.31.240:11111 #指定canal-deployer服务端监听的地址及端口
  11. batchSize: 500 #每次获取数据的批大小, 单位为K
  12. syncBatchSize: 1000 #每次同步的批数量
  13. retries: 0 #试次数, -1为无限重试,0为失败不再重试
  14. timeout: #同步超时时间, 单位毫秒,如将数据同步es的时,假如由于种种原因导致同时时间超出此值则认为超时
  15. srcDataSources: #源数据库配置
  16. defaultDS: #自定义名称
  17. url: jdbc:mysql://192.168.31.216:8809/canal_tsdb?useUnicode=true #jdbc url,指定数据库的地址及端口,canal_tsdb为mysql上面的苦,useUnicode编码
  18. username: canal #指定数据库账号
  19. password: Canal@2019! #指定数据库密码
  20. canalAdapters: #适配器列表
  21. groups: #分组列表
  22. - groupId: g1 #分组id, 如果适配器为MQ模式将用到该值
  23. outerAdapters: #分组内适配器列表
  24. - name: logger #日志打印适配器
  25. - name: es7 #集群版本,支持 es6 与 es7
  26. hosts: 192.168.31.215:9201,192.168.31.215:9202,192.168.31.215:9203 #elasticsearch集群地址,逗号分隔
  27. properties:
  28. mode: rest #可指定transport模式或者rest模式,rest模式下hosts参数指定端口必须为es对外端口,transport模式下hosts参数指定端口必须为集群通信端口
  29. security.auth: elastic:26tBktGolYCyZD2pPISW #指定elasticsearch集群账号密码
  30. cluster.name: elastic_cluster #指定elasticsearch集群名称

4.修改适配器映射文件

 
  1. vim /usr/local/canal-adapter/conf/es7/mytest_user.yml
  2. dataSourceKey: defaultDS #指定在application.yml文件中srcDataSources源数据源自定义的名称
  3. destination: example #cannal的instance或者MQ的topic,我们是把数据同步至es,所以不用修改,也用不到此处
  4. groupId: g1 #对应MQ模式下的groupId, 只会同步对应groupId的数据
  5. esMapping: #es中的Mapping设置
  6. _index: canal_tsdb #指定索引名称
  7. _id: _id #指定文档id,_id 此值则由es自动分配文档ID
  8. sql: "select a.id as _id,a.age,a.name,a.address from canal_table a" #sql映射
  9. etlCondition: "where a.c_time>={}" #etl的条件参数
  10. commitBatch: 3000 #提交批大小

在配置完成canal-adapter后还不要着急启动,需要事先在es中定义我们指定的索引以及mapping

Elasticsearch创建索引

打开Kibana界面,找到左侧的“开发工具”,然后进行创建索引及指定Mapping

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

语法如下:

 
  1. POST canal_tsdb/_doc
  2. {
  3. "mappings":{
  4. "_doc":{
  5. "properties":{
  6. "age":{
  7. "type":"long"
  8. },
  9. "name":{
  10. "type":"text"
  11. },
  12. "address":{
  13. "type":"text"
  14. }
  15. }
  16. }
  17. }
  18. }

查看创建的索引及Mapping

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

启动Canal-adapter并写入数据

1.在Canal-adapter机器上启动进程并查看日志

 
  1. /usr/local/canal-adapter/bin/startup.sh
  2. tail -f /usr/local/canal-adapter/logs/adapter/adapter.log

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

2.在MySQL再次插入一条数据并查看日志

 
  1. mysql> INSERT INTO canal_tsdb.canal_table(id, age, name, address) VALUES (2, 24, 'abcops.cn', '上海市浦东新区');
  2. Query OK, 1 row affected (0.01 sec)

3.查看Canal-deployer服务端日志

 
  1. tail -f /usr/local/canal-deployer/logs/example/meta.log
  2. 2019-12-17 15:48:47.457 - clientId:1001 cursor:[mysql-bin.000005,2125,1576568901000,2,] address[192.168.31.216/192.168.31.216:8809]

4.查看Canal-apadter客户端日志

 
  1. tail -f /usr/local/canal-adapter/logs/adapter/adapter.log
  2. 2019-12-17 15:48:47.060 [pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":2,"age":24,"name":"abcops.cn","address":"上海市浦东新区"}],"database":"canal_tsdb","destination":"example","es":1576568901000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"canal_table","ts":1576568927060,"type":"INSERT"}
  3. 2019-12-17 15:48:47.062 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":2,"age":24,"name":"abcops.cn","address":"上海市浦东新区"}],"database":"canal_tsdb","destination":"example","es":1576568901000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"canal_table","ts":1576568927061,"type":"INSERT"}
  4. Affected indexes: canal_tsdb

5.到Kibana中查看文档

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

6.在MySQL中更新数据

 
  1. mysql> update canal_table set name='goblogs.cn',address='上海市虹口区' where id=2;
  2. Query OK, 1 row affected (0.06 sec)
  3. Rows matched: 1 Changed: 1 Warnings: 0
  4.  
  5. mysql> select * from canal_table;
  6. +----+-----+------------+-----------------------+
  7. | id | age | name | address |
  8. +----+-----+------------+-----------------------+
  9. | 1 | 24 | 徐伟亮 | 上海市浦东新区 |
  10. | 2 | 24 | goblogs.cn | 上海市虹口区 |
  11. +----+-----+------------+-----------------------+
  12. 2 rows in set (0.00 sec)

然后去Kibana中进行查看文档是否被更新

7.在MySQL中删除数据

 
  1. mysql> delete from canal_table where id=2;
  2. Query OK, 1 row affected (0.02 sec)
  3.  
  4. mysql> select * from canal_table;
  5. +----+-----+-----------+-----------------------+
  6. | id | age | name | address |
  7. +----+-----+-----------+-----------------------+
  8. | 1 | 24 | 徐伟亮 | 上海市浦东新区 |
  9. +----+-----+-----------+-----------------------+
  10. 1 row in set (0.00 sec)

在Kibana中查看文档是否被删除

注:当数据同步至Elasticsearch后,Elasticsearch就有了对同步来数据的CURD权限

部署Canal Admin

canal-admin的限定依赖:
1.MySQL,用于存储配置和节点等相关数据
2.canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

1.Canal Admin下载并解压

 
  1. wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.admin-1.1.5-SNAPSHOT.tar.gz
  2. mkdir /usr/local/canal-admin
  3. tar xf canal.admin-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal-admin

2.修改配置文件

 
  1. vim /usr/local/canal-admin/conf/application.yml
  2. server:
  3. port: 8089 #Canal Admin监听端口
  4. spring:
  5. jackson:
  6. date-format: yyyy-MM-dd HH:mm:ss #时间格式
  7. time-zone: GMT+8 #时区
  8.  
  9. spring.datasource: #数据库信息
  10. address: 192.168.31.216:8809 #指定Canal Admin所使用的数据库地址及端口
  11. database: canal_manager #指定数据库名称
  12. username: cadmin #指定数据库账户
  13. password: Cadmin@2019! #指定数据库密码
  14. driver-class-name: com.mysql.jdbc.Driver #指定数据库驱动
  15. url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  16. hikari:
  17. maximum-pool-size: 30
  18. minimum-idle: 1
  19.  
  20. canal: #Canal UI界面默认账号密码
  21. adminUser: admin
  22. adminPasswd: admin

3.创建数据库及授权用户
在数据库节点上创建canal_manager数据库,并授权于cadmin用户此库的所有权限

 
  1. mysql> create database canal_manager character set utf8;
  2. Query OK, 1 row affected, 1 warning (0.01 sec)
  3.  
  4. mysql> create database canal_manager character set utf8;
  5. Query OK, 1 row affected, 1 warning (0.01 sec)
  6.  
  7. mysql> create user cadmin identified by 'Cadmin@2019!';
  8. Query OK, 0 rows affected (0.21 sec)
  9.  
  10. mysql> grant all on canal_manager.* to 'cadmin'@'%';
  11. Query OK, 0 rows affected (0.03 sec)
  12.  
  13. mysql> flush privileges;
  14. Query OK, 0 rows affected (0.01 sec)

4.导入SQL至MySQL数据库

 
  1. mysql -u cadmin -h 192.168.31.216 -P 8809 -p
  2. mysql> use canal_manager;
  3. Database changed
  4. mysql> source /usr/local/canal-admin/conf/canal_manager.sql;
  5. mysql> show tables;
  6. +-------------------------+
  7. | Tables_in_canal_manager |
  8. +-------------------------+
  9. | canal_adapter_config |
  10. | canal_cluster |
  11. | canal_config |
  12. | canal_instance_config |
  13. | canal_node_server |
  14. | canal_user |
  15. +-------------------------+
  16. 6 rows in set (0.03 sec)

5.启动Canal Admin

 
  1. /usr/local/canal-admin/bin/startup.sh

查看日志及监听端口

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

6.访问Canal Admin UI界面
访问链接:http://192.168.31.240:8089
默认账号密码为:admin/123456

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

Canal实现多张表同步至Elasticsearch

需求:现在需要在MySQL中新建 canal_tab01canal_tab02canal_03,这三张表,并将这三张表的数据同步至Elasticsearch

1.在Canal-adapter中创建多个配器映射文件
Canal-adapter客户端操作

 
  1. touch /usr/local/canal-adapter/conf/es7/{canal_tab01.yml,canal_tab02.yml,canal_tab03.yml}
  2. chmod 777 /usr/local/canal-adapter/conf/es7/canal_tab0*

2.在MySQL数据库中创建对应的表

 
  1. CREATE TABLE canal_tab01 (
  2. id int(11) NOT NULL,
  3. age int(11) NOT NULL,
  4. name varchar(200) NOT NULL,
  5. address varchar(1000) DEFAULT NULL,
  6. PRIMARY KEY (id)
  7. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

3.在canal_tab01-03中插入相应的字段

 
  1. mysql> INSERT INTO canal_tsdb.canal_tab01(id, age, name, address) VALUES (1, 20, 'abcops.cn', '上海市静安区');
  2. Query OK, 1 row affected (0.01 sec)
  3.  
  4. mysql> INSERT INTO canal_tsdb.canal_tab02(id, age, name, address) VALUES (1, 21, 'k8sops.cn', '上海市黄浦区');
  5. Query OK, 1 row affected (0.01 sec)
  6.  
  7. mysql> INSERT INTO canal_tsdb.canal_tab03(id, age, name, address) VALUES (1, 22, 'elk.abcops.cn', '上海市长宁区');
  8. Query OK, 1 row affected (0.01 sec)

4.写入适配器文件
以下只列出 canal_tab01 库的配置,其它库只需要修改 _index索引值,以及 canal_tab01查询表名称即可

 
  1. cat /usr/local/canal-adapter/conf/es7/canal_tab01.yml
  2. dataSourceKey: defaultDS
  3. destination: example
  4. groupId: g1
  5. esMapping:
  6. _index: canal_tab01 #其它两个配置文件只需要把canal_tab01索引换成canal_tab02和canal_tab03即可
  7. _id: _id
  8. sql: "select a.id as _id,a.age,a.name,a.address from canal_tab01 a" #其它两个配置文件需要把查询的表名称换为对应的即可
  9. etlCondition: "where a.c_time>={}"
  10. commitBatch: 3000

5.在Kibana中创建索引及Mapping

 
  1. POST canal_tab01/_doc
  2. {
  3. "mappings":{
  4. "_doc":{
  5. "properties":{
  6. "age":{
  7. "type":"long"
  8. },
  9. "name":{
  10. "type":"text"
  11. },
  12. "address":{
  13. "type":"text"
  14. }
  15. }
  16. }
  17. }
  18. }
  19.  
  20. POST canal_tab02/_doc
  21. {
  22. "mappings":{
  23. "_doc":{
  24. "properties":{
  25. "age":{
  26. "type":"long"
  27. },
  28. "name":{
  29. "type":"text"
  30. },
  31. "address":{
  32. "type":"text"
  33. }
  34. }
  35. }
  36. }
  37. }
  38.  
  39. POST canal_tab03/_doc
  40. {
  41. "mappings":{
  42. "_doc":{
  43. "properties":{
  44. "age":{
  45. "type":"long"
  46. },
  47. "name":{
  48. "type":"text"
  49. },
  50. "address":{
  51. "type":"text"
  52. }
  53. }
  54. }
  55. }
  56. }

6.重启Canal-adapter

 
  1. /usr/local/canal-adapter/bin/stop.sh
  2. /usr/local/canal-adapter/bin/startup.sh

7.自行查询Canal-adapter日志是否存在错误信息

8.由于Canal最大的缺点就是无法进行对数据的全量同步,所以我们刚在表里插入的字段也不会同步到ES中

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

9. 插入新的数据,测试是否同步至Elasticsearch
以下只修改了一下ID后进行插入

 
  1. mysql> INSERT INTO canal_tsdb.canal_tab01(id, age, name, address) VALUES (2, 20, 'abcops.cn', '上海市静安区');
  2. Query OK, 1 row affected (0.23 sec)
  3.  
  4. mysql> INSERT INTO canal_tsdb.canal_tab02(id, age, name, address) VALUES (2, 21, 'k8sops.cn', '上海市黄浦区');
  5. Query OK, 1 row affected (0.01 sec)
  6.  
  7. mysql> INSERT INTO canal_tsdb.canal_tab03(id, age, name, address) VALUES (2, 22, 'elk.abcops.cn', '上海市长宁区');
  8. Query OK, 1 row affected (0.00 sec)

10.查看Elasticsearch数据是否同步成功

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

MySQL通过Canal实现多库数据同步至Elasticsearch

1.修改Canal-adapter配置

 
  1. vim /usr/local/canal-adapter/conf/application.yml
  2. srcDataSources:
  3. defaultDS:
  4. url: jdbc:mysql://192.168.31.216:8809/canal_tsdb?useUnicode=true
  5. username: canal
  6. password: Canal@2019!
  7. crmdb: #自定义名称,我这里拿做库名来做区分
  8. url: jdbc:mysql://192.168.31.216:8809/crmdb?useUnicode=true #指定JDBC URI
  9. username: canal #指定复制账号
  10. password: Canal@2019! #复制密码

增加下图部分

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

2.创建适配器文件

 
  1. vim /usr/local/canal-adapter/conf/es7/crmdb_tab01.yml #crmdb库中的第一张表,按照表名称创建文件
  2. dataSourceKey: crmdb #数据源key需要与上面修改的配置自定义名称一致
  3. destination: example
  4. groupId: g1
  5. esMapping:
  6. _index: crmdb_tab01 #一张表一个索引,索引名称按照表名来命名
  7. _id: _id
  8. sql: "select a.id as _id,a.age,a.name,a.address from crmdb_tab01 a" #表名映射sql
  9. etlCondition: "where a.c_time>={}"
  10. commitBatch: 3000

3.创建库和表
创建crmdb库这里略过,创建crmdb_tab01表也略过

4.在Kibana中创建索引及Mapping
创建索引要与适配器文件中的索引名称一致,字段类型要与MySQL中符合,如果忘记请看上面文章,自行创建

5.重启Canal-adapter

 
  1. /usr/local/canal-adapter/bin/stop.sh
  2. /usr/local/canal-adapter/bin/startup.sh

6.在crmdb库中的crmdb_tab01表中自行插入数据
过程略。。。

7.查看canal-adapter日志
过程略

8.在Kibana中查看索引及同步来的文档

MySQL8.0.18通过Canal1.1.5数据增量同步至Elasticsearch7.x

总结

缺点:

  1. 不支持全量同步
  2. 必须先在ES创建好对应索引的Mapping

本文地址:https://blog.csdn.net/kongliand/article/details/110970562

相关标签: 运维