MySQL特定表全量、增量数据同步到消息队列-解决方案
程序员文章站
2022-03-29 08:19:42
目录1、原始需求2、解决方案3、canal介绍、安装canal的工作原理架构安装4、验证1、原始需求既要同步原始全量数据,也要实时同步mysql特定库的特定表增量数据,同时对应的修改、删除也要对应。数...
1、原始需求
既要同步原始全量数据,也要实时同步mysql特定库的特定表增量数据,同时对应的修改、删除也要对应。
数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。
应用场景:数据etl同步、降低业务服务器压力。
2、解决方案
3、canal介绍、安装
canal是阿里巴巴旗下的一款开源项目,纯java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql(也支持mariadb)。
工作原理:mysql主备复制实现
从上层来看,复制分成三步:
- master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
- slave将master的binary log events拷贝到它的中继日志(relay log);
- slave重做中继日志中的事件,将改变反映它自己的数据。
canal的工作原理
原理相对比较简单:
- canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- canal解析binary log对象(原始为byte流)
架构
说明:
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
- eventparser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventsink (parser和store链接器,进行数据过滤,加工,分发的工作)
- eventstore (数据存储)
- metamanager (增量订阅&消费信息管理器)
安装
1、mysql、kafka环境准备
2、canal下载:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
3、解压:tar -zxvf canal.deployer-1.1.3.tar.gz
4、对目录conf里文件参数配置
对canal.properties配置:
进入conf/example里,对instance.properties配置:
5、启动:bin/startup.sh
6、日志查看:
4、验证
1、开发对应的kafka消费者
package org.kafka; import java.util.arrays; import java.util.properties; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.common.serialization.stringdeserializer; /** * * title: kafkaconsumertest * description: * kafka消费者 demo * version:1.0.0 * @author pancm * @date 2018年1月26日 */ public class kafkaconsumertest implements runnable { private final kafkaconsumer<string, string> consumer; private consumerrecords<string, string> msglist; private final string topic; private static final string groupid = "groupa"; public kafkaconsumertest(string topicname) { properties props = new properties(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", groupid); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", stringdeserializer.class.getname()); props.put("value.deserializer", stringdeserializer.class.getname()); this.consumer = new kafkaconsumer<string, string>(props); this.topic = topicname; this.consumer.subscribe(arrays.aslist(topic)); } @override public void run() { int messageno = 1; system.out.println("---------开始消费---------"); try { for (; ; ) { msglist = consumer.poll(1000); if (null != msglist && msglist.count() > 0) { for (consumerrecord<string, string> record : msglist) { //消费100条就打印 ,但打印的数据不一定是这个规律的 system.out.println(messageno + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // string v = decodeunicode(record.value()); // system.out.println(v); //当消费了1000条就退出 if (messageno % 1000 == 0) { break; } messageno++; } } else { thread.sleep(11); } } } catch (interruptedexception e) { e.printstacktrace(); } finally { consumer.close(); } } public static void main(string args[]) { kafkaconsumertest test1 = new kafkaconsumertest("sample-data"); thread thread1 = new thread(test1); thread1.start(); } /* * 中文转unicode编码 */ public static string gbencoding(final string gbstring) { char[] utfbytes = gbstring.tochararray(); string unicodebytes = ""; for (int i = 0; i < utfbytes.length; i++) { string hexb = integer.tohexstring(utfbytes[i]); if (hexb.length() <= 2) { hexb = "00" + hexb; } unicodebytes = unicodebytes + "\\u" + hexb; } return unicodebytes; } /* * unicode编码转中文 */ public static string decodeunicode(final string datastr) { int start = 0; int end = 0; final stringbuffer buffer = new stringbuffer(); while (start > -1) { end = datastr.indexof("\\u", start + 2); string charstr = ""; if (end == -1) { charstr = datastr.substring(start + 2, datastr.length()); } else { charstr = datastr.substring(start + 2, end); } char letter = (char) integer.parseint(charstr, 16); // 16进制parse整形字符串。 buffer.append(new character(letter).tostring()); start = end; } return buffer.tostring(); } }
2、对表bak1进行增加数据
create table `bak1` ( `vin` varchar(20) not null, `p1` double default null, `p2` double default null, `p3` double default null, `p4` double default null, `p5` double default null, `p6` double default null, `p7` double default null, `p8` double default null, `p9` double default null, `p0` double default null ) engine=innodb default charset=utf8mb4 show create table bak1; insert into bak1 select '李雷abcv', `p1` , `p2` , `p3` , `p4` , `p5` , `p6` , `p7` , `p8` , `p9` , `p0` from moci limit 10
3、查看输出结果:
到此这篇关于mysql特定表全量、增量数据同步到消息队列-解决方案的文章就介绍到这了,更多相关mysql特定表数据同步内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!