欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

maxwell实时读取MySQL二进制日志binlog同步到kafka

程序员文章站 2022-03-27 10:24:33
...

Mysql的binlog日志是用来记录mysql内部增删等对mysql数据库有更新的内容的记录(对数据库 的改动),对数据库的查询select或show等不会被binlog日志记录;主要用于数据库的主从复制以及增量恢复。

mysql的binlog日志必须打开log-bin功能才能生产binlog日志

1、开启MySQL的binlog日志

     修改/etc/my.cnf

[mysqld]
log-bin=/var/lib/mysql/mysql-bin 【binlog⽇日志存放路路径】 
binlog-format=ROW 【⽇日志中会记录成每⼀一⾏行行数据被修改的形式】 
server_id=1 【指定当前机器器的服务ID(如果是集群,不不能重复)】

   重启MySQL并通过命令验证

mysql> show variables like '%log_bin%';

  maxwell实时读取MySQL二进制日志binlog同步到kafka

  进入指定的binlog路径查看是否生产binlog

cd /var/lib/mysql/

 maxwell实时读取MySQL二进制日志binlog同步到kafka

2、安装Maxwell

Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。

官网(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)

Maxwell主要提供了下列功能:

支持 SELECT * FROM table 的方式进行全量数据初始化 支持在主库发生failover后,自动恢复binlog位置(GTID) 可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区 工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event 除了Maxwell外,目前常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三个工具对比如下:

maxwell实时读取MySQL二进制日志binlog同步到kafka

canal 由Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。

maxwell相对于canal的优势是使用简单,它直接将数据变更输出为json字符串,不需要再编写客户端

将压缩包上传到MySQL节点解压

tar -zxvf maxwell-1.21.1.tar.gz

3、配置maxwell,解析binlog到kafka

编写配置文件,主要用来指定解析业务库的binlog和发送数据到kafka

[aaa@qq.com maxwell-1.21.1]# vim driver.properties 
######### binlog ###############
log_level=INFO
producer=kafka
host = node02
user = maxwell
password = !Qaz123456
producer_ack_timeout = 600000
######### binlog ###############

######### output format stuff ###############
output_binlog_position=ture
output_server_id=true
output_thread_id=ture
output_commit_info=true
output_row_query=true
output_ddl=false
output_nulls=true
output_xoffset=true
output_schema_id=true
######### output format stuff ###############
############ kafka stuff #############
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092,node04:9092,node05:9092,node06:9092
kafka_topic=veche
kafka_partition_hash=murmur3
kafka_key_format=hash
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=all
producer_partition_by=primary_key
############ kafka stuff #############
############## misc stuff ###########
bootstrapper=async
############## misc stuff ##########
############## filter ###############
filter=exclude:*.*, include: car.order_info_201904,include: car.order_info_201905,include: car.order_info_201906,include: car.order_info_201907,include: car.order_info_201908,include: car.order_info_201906,include: car.order_info_201910,include: car.order_info_201911,include: car.order_info_201912,include: car.renter_info,include: car.driver_info ,include: car.opt_alliance_business
############## filter ###############

以上参数解释:http://maxwells-daemon.io/config/

######### binlog ###############
用于指定解析目标机器的binlog(业务库所在机器地址)

######### output format stuff ###############
配置输出格式


############ kafka stuff #############
指定kafka地址

############## misc stuff ###########
在处理bootstrap时,是否会阻塞正常的binlog解析  async不会阻塞

############## filter ###############
用于指定需要监控哪些库、哪些表

4、创建Maxwell数据库用户并赋予权限

创建Maxwell用户,并赋予 maxwell 库的一些权限

CREATE USER 'maxwell'@'%' IDENTIFIED BY '!Qaz123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 
flush privileges; 

5、编写Maxwell启动脚本

#! /bin/bash
nohup bin/maxwell --daemon --config driver.properties 2>&1 >> /opt/cdh/maxwell-1.21.1/logs/maxwell.log &

启动之后,通过jps -m查看进程

maxwell实时读取MySQL二进制日志binlog同步到kafka

相关标签: 环境搭建