欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

程序员文章站 2024-03-21 09:08:22
...

maxwell简介

maxwell是一个由Java编写的守护进程,可以实时读取mysql binlog并将行更新以JSON格式写入Kafka,Kinesis,RabbitMQ,Google Cloud Pub / Sub或Redis(Pub / Sub或LPUSH)。(以上内容摘自maxwell官网)。可以想象,有了mysql增量数据流,使用场景就很多了,比如:实时同步数据到缓存,同步数据到ElasticSearch,数据迁移等等。与canal(ali)相比,更加轻量

maxwell还提供以下功能:

  • 使用SELECT * FROM table 的方式做全量数据初始化
  • 支持主库发生failover后,自动恢复binlog位置
  • 对数据进行分区,解决数据倾斜的问题
  • 伪装成mysql从库,接收binlog

maxwell官网:http://maxwells-daemon.io/

maxwell源码:https://github.com/zendesk/maxwell

maxwell使用

mysql配置

需要mysql开启binlog,而binlog默认是关闭的,需要开启,并且为了保证同步数据的一致性,使用的日志格式为row-based replication(RBR),新建或修改my.conf开启binlog。

$ vim /etc/my.cnf

添加内容

[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #随机指定一个不能和其他集群中机器重名的字符串,如果只有一台机器,那就可以随便指定了

重启mysql, 查询是否已开启bin

show variables like '%log_bin%'

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

配置jdk环境(略)

maxwell 依赖java sdk,所以需要先配置JDK环境。

下载maxwell:https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz

解压后

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

修改config.properties.example为config.properties

到目前为止,前期准备已完成

实战项目:配置maxwell发送消息到rabbitMQ, 监听MQ队列,处理消息

mysql db如下:

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

修改maxwell的config.properties

# tl;dr config 生产环境配置为info级别
log_level=DEBUG

producer=rabbitmq


# mysql login info, mysql用户必须拥有读取binlog权限和新建库表的权限
host=127.0.0.1
user=xiehd
password=xiehd2018

output_nulls=true
# options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello
jdbc_options=autoReconnet=true

#需要同步的数据库,表,及不包含的字段
include_dbs=xhd-sso
#include_tables=SYS_USER,SYS_ROLE
#exclude_columns=password

metrics_type=http
metrics_slf4j_interval=60
http_port=8111
http_diagnostic=true # default false


#rabbitmq
rabbitmq_host=192.168.50.184
rabbitmq_port=5672
rabbitmq_user=maxwell
aaa@qq.com
rabbitmq_virtual_host=/
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=topic
rabbitmq_exchange_durable=false
rabbitmq_exchange_autodelete=false
rabbitmq_routing_key_template=%db%.%table%
rabbitmq_message_persistent=false
rabbitmq_declare_exchange=true

启动maxwell

./bin/maxwell

启动成功

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

此时会自动生成maxwell库,该库记录了maxwell同步的状态,最后一次同步的id等等信息,在主库失败或同步异常后,只要maxwell库存在,下次同步会根据最后一次同步的id。如果没有生成maxwell库或报错,可能config.properties中配置的mysql用户权限不够

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

此时修改mysql中的SYS_ROLE表数据,maxwell控制台会打印相应的json格式的日志

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

rabbitMQ控制台中会自动创建名称为maxwell的exchange(该exchange为config.properties中配置的)

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

新建Queue并绑定exchange,并设置routingkey为%db%.%table%(该routingkey为config.properties中配置)

使用maxwell实时同步mysql数据到消息队列(rabbitMQ)

修改表SYS_USER中数据,此时修改的数据会被发送到rabbitMQ

使用maxwell实时同步mysql数据到消息队列(rabbitMQ) 一般情况下,一张表对应一个queue, 也可以多张表共用一个queue, 根据实际吞吐量灵活使用。

MQ消费端

新建一个MaxwellData

public class MaxwellData implements Serializable {
    private String type;
    private String database;
    private String table;
    private Map<String, Object> data;
    private Map<String, Object> old;

    public MaxwellData() {
    }

    public String getType() {
        return this.type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getDatabase() {
        return this.database;
    }

    public void setDatabase(String database) {
        this.database = database;
    }

    public String getTable() {
        return this.table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public Map<String, Object> getData() {
        return this.data;
    }

    public void setData(Map<String, Object> data) {
        this.data = data;
    }

    public Map<String, Object> getOld() {
        return this.old;
    }

    public void setOld(Map<String, Object> old) {
        this.old = old;
    }

解析maxwell数据

    @RabbitHandler
    @RabbitListener(queues = "SYS_USER")
    public void process(byte[] data) {
        String s = new String(data);
        MaxwellData maxwellData = decodeMsg(s);
        logger.info("maxwellData:" + maxwellData);
        //to do
    }
    private MaxwellData decodeMsg(String msg) {
        if (msg == null) {
            return null;
        }
        return JSON.parseObject(msg, MaxwellData.class);
    }

当然也可以同步到Redis, kafka, rocketMQ(rocketmq需要自行实现producer)等