欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用canal将mysql数据同步到Redis的操作方法

程序员文章站 2022-06-24 23:15:00
写这篇博文时,自己一定是含着误删的眼泪写完的,文中的后续部分会谈到这个“从删库到**”的悲惨故事,这个故事深刻地教训了我,我也想以此来警示大家,注意数据安全和数据备份。 1....

写这篇博文时,自己一定是含着误删的眼泪写完的,文中的后续部分会谈到这个“从删库到**”的悲惨故事,这个故事深刻地教训了我,我也想以此来警示大家,注意数据安全和数据备份。

1. 可行方案

回归正题:我们的标题为《使用canal将mysql数据同步到redis的操作方法》,那就先来说说我们的目的:mysql数据同步到redis,想达到读写分离,redis只做缓存,mysql做持久化。刚开始想这样干的时候就去网上收集资料,发现了n多做法:

先从redis读取数据,如果没有查询到;便从mysql查询数据,将查询到的内容放到redis中。对于写操作,先对mysql进行写,写成功对redis进行写。当然这是一种相对直观而且简单的方法,但是看起来有许多操作需要我们自己去做。

使用mysql的udf去做,大体的思想是通过数据库中的trigger调用自定义的函数库来触发对redis的相应操作,比较麻烦的一点是:自定义的函数库需要我们基于mysql的api进行开发(c++),想想自己的java程序要去调用这么一堆玩意,本人很不情愿。据了解,该方法也是阿里早起的解决方案,具体的步骤可参照:《【菜鸟玩linux开发】通过mysql自动同步刷新redis》

通过gearman去同步,但是通过了解发现,它一般使用在php的开发中。

接下来的两种方案都属于对mysql中的binlog进行解析的方法了。

使用open-replicator解析binlog,https://github.com/whitesock/open-replicator.

使用canal进行同步,当然是能够解放双手的工具。

通过大量的资料收集和调查,我使用了canal进行了mysql数据同步到redis。先简单谈谈canal:

canal主要是基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,核心基本就是模拟mysql中slave节点请求。具体的原理在这里不进行介绍,可以移步《阿里巴巴开源项目: canal 基于mybinlog的增量订阅&消费》 进行学习。

2. mysql的配置

开启mysql的binlog模块

切换到mysql的安装路径,找到my.cnf(linux)/my.ini (windows),加入如下内容:

[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=row #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveid重复

配置完成后,需要重启数据库。当重启数据库遇到问题时,耐心解决,但需要警告的是,千万别动data文件夹下的文件。当然如果你觉得你比较有“资本”,同时遇到了“mysql 1067 无法启动”的错误,你可以试着备份一下data文件夹下的内容,删除logfile文件,重启数据库即可,但本人极不推荐这样进行操作。就是由于本人之前的无知,根据一个无良博客,误删了ibdata1文件,使得本人造成了很大的损失,mysql下的所有数据库瞬间毁灭。

配置mysql数据库

创建canal用户,用来管理canal的访问权限。我们可以通过对canal用户访问权限的控制,进而控制canal能够获取的内容。

create user canal identified by 'canal';    
grant select, replication slave, replication client on 数据库名.表名 to 'canal'@'%';  
-- grant all privileges on 数据库名.表名 to 'canal'@'%' ;  
flush privileges;  

3. canal配置与部署

下载部署包

下载,解压,我使用的是最新版本1.0.22

配置canal

主要配置的文件有两处,canal/conf/example/instance.properties 和 canal/conf/canal.properties . 而canal.properties 文件我们一般保持默认配置,所以我们仅对instance.properties 进行修改。如果需要对canal进行复杂的配置,可以参考《canal adminguide》。

## mysql serverid
canal.instance.mysql.slaveid = 1234

# position info
canal.instance.master.address = ***.***.***.***:3306 #改成自己的数据库地址
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password
canal.instance.dbusername = canal #改成自己的数据库信息 
canal.instance.dbpassword = canal #改成自己的数据库信息 
canal.instance.defaultdatabasename =  #改成自己的数据库信息
canal.instance.connectioncharset = utf-8 #改成自己的数据库信息 

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex = 
启动canal
./canal/startup.sh
查看启动状态

我们可以通过查看logs/canal/canal.log 和logs/example/example.log日志来判断canal是否启动成功。

canal/logs/canal/canal.log

2016-12-29 14:03:00.956 [main] info  com.alibaba.otter.canal.deployer.canallauncher - ## start the canal server.
2016-12-29 14:03:01.071 [main] info  com.alibaba.otter.canal.deployer.canalcontroller - ## start the canal server[192.168.1.99:11111]
2016-12-29 14:03:01.628 [main] info  com.alibaba.otter.canal.deployer.canallauncher - ## the canal server is running now ......

canal/logs/example/example.log

2016-12-29 14:03:01.357 [main] info  c.a.o.c.i.spring.support.propertyplaceholderconfigurer - loading properties file from class path resource [canal.properties]
2016-12-29 14:03:01.362 [main] info  c.a.o.c.i.spring.support.propertyplaceholderconfigurer - loading properties file from class path resource [example/instance.properties]
2016-12-29 14:03:01.535 [main] info  c.a.otter.canal.instance.spring.canalinstancewithspring - start cannalinstance for 1-example 
2016-12-29 14:03:01.555 [main] info  c.a.otter.canal.instance.core.abstractcanalinstance - start successful....

4. java连接canal执行同步操作

在maven项目中中加载canal和redis依赖包.

    
    redis.clients    
    jedis    
    2.4.2    
 
    
    com.alibaba.otter    
    canal.client    
    1.0.22    
   

建立canal客户端,从canal中获取数据,并将数据更新至redis.

import java.net.inetsocketaddress;  
import java.util.list;  

import com.alibaba.fastjson.jsonobject;
import com.alibaba.otter.canal.client.canalconnector;  
import com.alibaba.otter.canal.common.utils.addressutils;  
import com.alibaba.otter.canal.protocol.message;  
import com.alibaba.otter.canal.protocol.canalentry.column;  
import com.alibaba.otter.canal.protocol.canalentry.entry;  
import com.alibaba.otter.canal.protocol.canalentry.entrytype;  
import com.alibaba.otter.canal.protocol.canalentry.eventtype;  
import com.alibaba.otter.canal.protocol.canalentry.rowchange;  
import com.alibaba.otter.canal.protocol.canalentry.rowdata;  
import com.alibaba.otter.canal.client.*;  

public class canalclient{  
   public static void main(string args[]) {  
       canalconnector connector = canalconnectors.newsingleconnector(new inetsocketaddress(addressutils.gethostip(),  
               11111), "example", "", "");  
       int batchsize = 1000;  
       try {  
           connector.connect();  
           connector.subscribe(".*\\..*");  
           connector.rollback();    
           while (true) {  
               message message = connector.getwithoutack(batchsize); // 获取指定数量的数据  
               long batchid = message.getid();  
               int size = message.getentries().size();  
               if (batchid == -1 || size == 0) {  
                   try {  
                       thread.sleep(1000);  
                   } catch (interruptedexception e) {  
                       e.printstacktrace();  
                   }  
               } else {  
                   printentry(message.getentries());  
               }  
               connector.ack(batchid); // 提交确认  
               // connector.rollback(batchid); // 处理失败, 回滚数据  
           }  
       } finally {  
           connector.disconnect();  
       }  
   }  

   private static void printentry( list entrys) {  
       for (entry entry : entrys) {  
           if (entry.getentrytype() == entrytype.transactionbegin || entry.getentrytype() == entrytype.transactionend) {  
               continue;  
           }  
           rowchange rowchage = null;  
           try {  
               rowchage = rowchange.parsefrom(entry.getstorevalue());  
           } catch (exception e) {  
               throw new runtimeexception("error ## parser of eromanga-event has an error , data:" + entry.tostring(),  
                       e);  
           }  
           eventtype eventtype = rowchage.geteventtype();  
           system.out.println(string.format("================> binlog[%s:%s] , name[%s,%s] , eventtype : %s",  
                   entry.getheader().getlogfilename(), entry.getheader().getlogfileoffset(),  
                   entry.getheader().getschemaname(), entry.getheader().gettablename(),  
                   eventtype));  

           for (rowdata rowdata : rowchage.getrowdataslist()) {  
               if (eventtype == eventtype.delete) {  
                   redisdelete(rowdata.getbeforecolumnslist());  
               } else if (eventtype == eventtype.insert) {  
                   redisinsert(rowdata.getaftercolumnslist());  
               } else {  
                   system.out.println("-------> before");  
                   printcolumn(rowdata.getbeforecolumnslist());  
                   system.out.println("-------> after");  
                   redisupdate(rowdata.getaftercolumnslist());  
               }  
           }  
       }  
   }  

   private static void printcolumn( list columns) {  
       for (column column : columns) {  
           system.out.println(column.getname() + " : " + column.getvalue() + "    update=" + column.getupdated());  
       }  
   }  

  private static void redisinsert( list columns){
      jsonobject json=new jsonobject();
      for (column column : columns) {  
          json.put(column.getname(), column.getvalue());  
       }  
      if(columns.size()>0){
          redisutil.stringset("user:"+ columns.get(0).getvalue(),json.tojsonstring());
      }
   }

  private static  void redisupdate( list columns){
      jsonobject json=new jsonobject();
      for (column column : columns) {  
          json.put(column.getname(), column.getvalue());  
       }  
      if(columns.size()>0){
          redisutil.stringset("user:"+ columns.get(0).getvalue(),json.tojsonstring());
      }
  }

   private static  void redisdelete( list columns){
       jsonobject json=new jsonobject();
          for (column column : columns) {  
              json.put(column.getname(), column.getvalue());  
           }  
          if(columns.size()>0){
              redisutil.delkey("user:"+ columns.get(0).getvalue());
          }
   }   
}  

redisutil 工具类

import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispool;
import redis.clients.jedis.jedispoolconfig;

public class redisutil {

    // redis服务器ip
    private static string addr = "0.0.0.0";

    // redis的端口号
    private static int port = 6379;

    // 访问密码
    //private static string auth = "admin";

    // 可用连接实例的最大数目,默认值为8;
    // 如果赋值为-1,则表示不限制;如果pool已经分配了maxactive个jedis实例,则此时pool的状态为exhausted(耗尽)。
    private static int max_active = 1024;

    // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
    private static int max_idle = 200;

    // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出jedisconnectionexception;
    private static int max_wait = 10000;

    // 过期时间
    protected static int  expiretime = 60 * 60 *24;

    // 连接池
    protected static jedispool pool;

    /**
     * 静态代码,只在初次调用一次
     */
    static {
        jedispoolconfig config = new jedispoolconfig();
        //最大连接数
        config.setmaxtotal(max_active);
        //最多空闲实例
        config.setmaxidle(max_idle);
        //超时时间
        config.setmaxwaitmillis(max_wait);
        //
        config.settestonborrow(false);
        pool = new jedispool(config, addr, port, 1000);
    }

    /**
     * 获取jedis实例
     */
    protected static synchronized jedis getjedis() {
        jedis jedis = null;
        try {
            jedis = pool.getresource();
        } catch (exception e) {
            e.printstacktrace();
            if (jedis != null) {
                pool.returnbrokenresource(jedis);
            }
        }
        return jedis;
    }

    /**
     * 释放jedis资源
     * @param jedis
     * @param isbroken
     */
    protected static void closeresource(jedis jedis, boolean isbroken) {
        try {
            if (isbroken) {
                pool.returnbrokenresource(jedis);
            } else {
                pool.returnresource(jedis);
            }
        } catch (exception e) {

        }
    }

    /**
     * 是否存在key
     * @param key
     */
    public static boolean existkey(string key) {
        jedis jedis = null;
        boolean isbroken = false;
        try {
            jedis = getjedis();
            jedis.select(0);
            return jedis.exists(key);
        } catch (exception e) {
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
        return false;
    }

    /**
     * 删除key
     * @param key
     */
    public static void delkey(string key) {
        jedis jedis = null;
        boolean isbroken = false;
        try {
            jedis = getjedis();
            jedis.select(0);
            jedis.del(key);
        } catch (exception e) {
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
    }

    /**
     * 取得key的值
     * @param key
     */
    public static string stringget(string key) {
        jedis jedis = null;
        boolean isbroken = false;
        string lastval = null;
        try {
            jedis = getjedis();
            jedis.select(0);
            lastval = jedis.get(key);
            jedis.expire(key, expiretime);
        } catch (exception e) {
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
        return lastval;
    }

    /**
     * 添加string数据
     * @param key
     * @param value
     */
    public static string stringset(string key, string value) {
        jedis jedis = null;
        boolean isbroken = false;
        string lastval = null;
        try {
            jedis = getjedis();
            jedis.select(0);
            lastval = jedis.set(key, value);
            jedis.expire(key, expiretime);
        } catch (exception e) {
            e.printstacktrace();
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
        return lastval;
    }

    /**
     *  添加hash数据
     * @param key
     * @param field
     * @param value
     */
    public static void hashset(string key, string field, string value) {
        boolean isbroken = false;
        jedis jedis = null;
        try {
            jedis = getjedis();
            if (jedis != null) {
                jedis.select(0);
                jedis.hset(key, field, value);
                jedis.expire(key, expiretime);
            }
        } catch (exception e) {
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
    }
}

至此,我们利用canal进行了mysql数据同步到redis的任务,可以根据不同的需求将代码进行修改置于需要的位置。