使用canal将mysql数据同步到Redis的操作方法
写这篇博文时,自己一定是含着误删的眼泪写完的,文中的后续部分会谈到这个“从删库到**”的悲惨故事,这个故事深刻地教训了我,我也想以此来警示大家,注意数据安全和数据备份。
1. 可行方案
回归正题:我们的标题为《使用canal将mysql数据同步到redis的操作方法》,那就先来说说我们的目的:mysql数据同步到redis,想达到读写分离,redis只做缓存,mysql做持久化。刚开始想这样干的时候就去网上收集资料,发现了n多做法:
先从redis读取数据,如果没有查询到;便从mysql查询数据,将查询到的内容放到redis中。对于写操作,先对mysql进行写,写成功对redis进行写。当然这是一种相对直观而且简单的方法,但是看起来有许多操作需要我们自己去做。
使用mysql的udf去做,大体的思想是通过数据库中的trigger调用自定义的函数库来触发对redis的相应操作,比较麻烦的一点是:自定义的函数库需要我们基于mysql的api进行开发(c++),想想自己的java程序要去调用这么一堆玩意,本人很不情愿。据了解,该方法也是阿里早起的解决方案,具体的步骤可参照:《【菜鸟玩linux开发】通过mysql自动同步刷新redis》
通过gearman去同步,但是通过了解发现,它一般使用在php的开发中。
接下来的两种方案都属于对mysql中的binlog进行解析的方法了。
使用open-replicator解析binlog,https://github.com/whitesock/open-replicator.
使用canal进行同步,当然是能够解放双手的工具。通过大量的资料收集和调查,我使用了canal进行了mysql数据同步到redis。先简单谈谈canal:
canal主要是基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,核心基本就是模拟mysql中slave节点请求。具体的原理在这里不进行介绍,可以移步《阿里巴巴开源项目: canal 基于mybinlog的增量订阅&消费》 进行学习。
2. mysql的配置
开启mysql的binlog模块切换到mysql的安装路径,找到my.cnf(linux)/my.ini (windows),加入如下内容:
[mysqld] log-bin=mysql-bin #添加这一行就ok binlog-format=row #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveid重复
配置完成后,需要重启数据库。当重启数据库遇到问题时,耐心解决,但需要警告的是,千万别动data文件夹下的文件。当然如果你觉得你比较有“资本”,同时遇到了“mysql 1067 无法启动”的错误,你可以试着备份一下data文件夹下的内容,删除logfile文件,重启数据库即可,但本人极不推荐这样进行操作。就是由于本人之前的无知,根据一个无良博客,误删了ibdata1文件,使得本人造成了很大的损失,mysql下的所有数据库瞬间毁灭。
配置mysql数据库创建canal用户,用来管理canal的访问权限。我们可以通过对canal用户访问权限的控制,进而控制canal能够获取的内容。
create user canal identified by 'canal'; grant select, replication slave, replication client on 数据库名.表名 to 'canal'@'%'; -- grant all privileges on 数据库名.表名 to 'canal'@'%' ; flush privileges;
3. canal配置与部署
下载部署包下载,解压,我使用的是最新版本1.0.22
配置canal主要配置的文件有两处,canal/conf/example/instance.properties 和 canal/conf/canal.properties . 而canal.properties 文件我们一般保持默认配置,所以我们仅对instance.properties 进行修改。如果需要对canal进行复杂的配置,可以参考《canal adminguide》。
## mysql serverid canal.instance.mysql.slaveid = 1234 # position info canal.instance.master.address = ***.***.***.***:3306 #改成自己的数据库地址 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # username/password canal.instance.dbusername = canal #改成自己的数据库信息 canal.instance.dbpassword = canal #改成自己的数据库信息 canal.instance.defaultdatabasename = #改成自己的数据库信息 canal.instance.connectioncharset = utf-8 #改成自己的数据库信息 # table regex canal.instance.filter.regex = .*\\..* # table black regex canal.instance.filter.black.regex =启动canal
./canal/startup.sh查看启动状态
我们可以通过查看logs/canal/canal.log 和logs/example/example.log日志来判断canal是否启动成功。
canal/logs/canal/canal.log
2016-12-29 14:03:00.956 [main] info com.alibaba.otter.canal.deployer.canallauncher - ## start the canal server. 2016-12-29 14:03:01.071 [main] info com.alibaba.otter.canal.deployer.canalcontroller - ## start the canal server[192.168.1.99:11111] 2016-12-29 14:03:01.628 [main] info com.alibaba.otter.canal.deployer.canallauncher - ## the canal server is running now ......
canal/logs/example/example.log
2016-12-29 14:03:01.357 [main] info c.a.o.c.i.spring.support.propertyplaceholderconfigurer - loading properties file from class path resource [canal.properties] 2016-12-29 14:03:01.362 [main] info c.a.o.c.i.spring.support.propertyplaceholderconfigurer - loading properties file from class path resource [example/instance.properties] 2016-12-29 14:03:01.535 [main] info c.a.otter.canal.instance.spring.canalinstancewithspring - start cannalinstance for 1-example 2016-12-29 14:03:01.555 [main] info c.a.otter.canal.instance.core.abstractcanalinstance - start successful....
4. java连接canal执行同步操作
在maven项目中中加载canal和redis依赖包.
redis.clients jedis 2.4.2 com.alibaba.otter canal.client 1.0.22
建立canal客户端,从canal中获取数据,并将数据更新至redis.
import java.net.inetsocketaddress; import java.util.list; import com.alibaba.fastjson.jsonobject; import com.alibaba.otter.canal.client.canalconnector; import com.alibaba.otter.canal.common.utils.addressutils; import com.alibaba.otter.canal.protocol.message; import com.alibaba.otter.canal.protocol.canalentry.column; import com.alibaba.otter.canal.protocol.canalentry.entry; import com.alibaba.otter.canal.protocol.canalentry.entrytype; import com.alibaba.otter.canal.protocol.canalentry.eventtype; import com.alibaba.otter.canal.protocol.canalentry.rowchange; import com.alibaba.otter.canal.protocol.canalentry.rowdata; import com.alibaba.otter.canal.client.*; public class canalclient{ public static void main(string args[]) { canalconnector connector = canalconnectors.newsingleconnector(new inetsocketaddress(addressutils.gethostip(), 11111), "example", "", ""); int batchsize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { message message = connector.getwithoutack(batchsize); // 获取指定数量的数据 long batchid = message.getid(); int size = message.getentries().size(); if (batchid == -1 || size == 0) { try { thread.sleep(1000); } catch (interruptedexception e) { e.printstacktrace(); } } else { printentry(message.getentries()); } connector.ack(batchid); // 提交确认 // connector.rollback(batchid); // 处理失败, 回滚数据 } } finally { connector.disconnect(); } } private static void printentry( list entrys) { for (entry entry : entrys) { if (entry.getentrytype() == entrytype.transactionbegin || entry.getentrytype() == entrytype.transactionend) { continue; } rowchange rowchage = null; try { rowchage = rowchange.parsefrom(entry.getstorevalue()); } catch (exception e) { throw new runtimeexception("error ## parser of eromanga-event has an error , data:" + entry.tostring(), e); } eventtype eventtype = rowchage.geteventtype(); system.out.println(string.format("================> binlog[%s:%s] , name[%s,%s] , eventtype : %s", entry.getheader().getlogfilename(), entry.getheader().getlogfileoffset(), entry.getheader().getschemaname(), entry.getheader().gettablename(), eventtype)); for (rowdata rowdata : rowchage.getrowdataslist()) { if (eventtype == eventtype.delete) { redisdelete(rowdata.getbeforecolumnslist()); } else if (eventtype == eventtype.insert) { redisinsert(rowdata.getaftercolumnslist()); } else { system.out.println("-------> before"); printcolumn(rowdata.getbeforecolumnslist()); system.out.println("-------> after"); redisupdate(rowdata.getaftercolumnslist()); } } } } private static void printcolumn( list columns) { for (column column : columns) { system.out.println(column.getname() + " : " + column.getvalue() + " update=" + column.getupdated()); } } private static void redisinsert( list columns){ jsonobject json=new jsonobject(); for (column column : columns) { json.put(column.getname(), column.getvalue()); } if(columns.size()>0){ redisutil.stringset("user:"+ columns.get(0).getvalue(),json.tojsonstring()); } } private static void redisupdate( list columns){ jsonobject json=new jsonobject(); for (column column : columns) { json.put(column.getname(), column.getvalue()); } if(columns.size()>0){ redisutil.stringset("user:"+ columns.get(0).getvalue(),json.tojsonstring()); } } private static void redisdelete( list columns){ jsonobject json=new jsonobject(); for (column column : columns) { json.put(column.getname(), column.getvalue()); } if(columns.size()>0){ redisutil.delkey("user:"+ columns.get(0).getvalue()); } } }
redisutil 工具类
import redis.clients.jedis.jedis; import redis.clients.jedis.jedispool; import redis.clients.jedis.jedispoolconfig; public class redisutil { // redis服务器ip private static string addr = "0.0.0.0"; // redis的端口号 private static int port = 6379; // 访问密码 //private static string auth = "admin"; // 可用连接实例的最大数目,默认值为8; // 如果赋值为-1,则表示不限制;如果pool已经分配了maxactive个jedis实例,则此时pool的状态为exhausted(耗尽)。 private static int max_active = 1024; // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。 private static int max_idle = 200; // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出jedisconnectionexception; private static int max_wait = 10000; // 过期时间 protected static int expiretime = 60 * 60 *24; // 连接池 protected static jedispool pool; /** * 静态代码,只在初次调用一次 */ static { jedispoolconfig config = new jedispoolconfig(); //最大连接数 config.setmaxtotal(max_active); //最多空闲实例 config.setmaxidle(max_idle); //超时时间 config.setmaxwaitmillis(max_wait); // config.settestonborrow(false); pool = new jedispool(config, addr, port, 1000); } /** * 获取jedis实例 */ protected static synchronized jedis getjedis() { jedis jedis = null; try { jedis = pool.getresource(); } catch (exception e) { e.printstacktrace(); if (jedis != null) { pool.returnbrokenresource(jedis); } } return jedis; } /** * 释放jedis资源 * @param jedis * @param isbroken */ protected static void closeresource(jedis jedis, boolean isbroken) { try { if (isbroken) { pool.returnbrokenresource(jedis); } else { pool.returnresource(jedis); } } catch (exception e) { } } /** * 是否存在key * @param key */ public static boolean existkey(string key) { jedis jedis = null; boolean isbroken = false; try { jedis = getjedis(); jedis.select(0); return jedis.exists(key); } catch (exception e) { isbroken = true; } finally { closeresource(jedis, isbroken); } return false; } /** * 删除key * @param key */ public static void delkey(string key) { jedis jedis = null; boolean isbroken = false; try { jedis = getjedis(); jedis.select(0); jedis.del(key); } catch (exception e) { isbroken = true; } finally { closeresource(jedis, isbroken); } } /** * 取得key的值 * @param key */ public static string stringget(string key) { jedis jedis = null; boolean isbroken = false; string lastval = null; try { jedis = getjedis(); jedis.select(0); lastval = jedis.get(key); jedis.expire(key, expiretime); } catch (exception e) { isbroken = true; } finally { closeresource(jedis, isbroken); } return lastval; } /** * 添加string数据 * @param key * @param value */ public static string stringset(string key, string value) { jedis jedis = null; boolean isbroken = false; string lastval = null; try { jedis = getjedis(); jedis.select(0); lastval = jedis.set(key, value); jedis.expire(key, expiretime); } catch (exception e) { e.printstacktrace(); isbroken = true; } finally { closeresource(jedis, isbroken); } return lastval; } /** * 添加hash数据 * @param key * @param field * @param value */ public static void hashset(string key, string field, string value) { boolean isbroken = false; jedis jedis = null; try { jedis = getjedis(); if (jedis != null) { jedis.select(0); jedis.hset(key, field, value); jedis.expire(key, expiretime); } } catch (exception e) { isbroken = true; } finally { closeresource(jedis, isbroken); } } }
至此,我们利用canal进行了mysql数据同步到redis的任务,可以根据不同的需求将代码进行修改置于需要的位置。
推荐阅读
-
wiindows下数据库文件使用脚本同步到linux下的mysql数据库中
-
使用canal将mysql数据同步到Redis的操作方法
-
使用canal实现增量同步MySQL的数据到ES
-
高并发 - php redis做mysql的缓存,怎么异步redis同步到mysql数据库?
-
使用GoldenGate实现MySQL到Oracle的数据实时同步
-
如何将redis数据同步到mysql?
-
node.js将MongoDB数据同步到MySQL的步骤
-
wiindows下数据库文件使用脚本同步到linux下的mysql数据库中
-
使用canal增量同步mysql数据库信息到ElasticSearch
-
使用canal将mysql数据同步到Redis的操作方法