maxwell实时读取MySQL二进制日志binlog同步到kafka
Mysql的binlog日志是用来记录mysql内部增删等对mysql数据库有更新的内容的记录(对数据库 的改动),对数据库的查询select或show等不会被binlog日志记录;主要用于数据库的主从复制以及增量恢复。
mysql的binlog日志必须打开log-bin功能才能生产binlog日志
1、开启MySQL的binlog日志
修改/etc/my.cnf
[mysqld]
log-bin=/var/lib/mysql/mysql-bin 【binlog⽇日志存放路路径】
binlog-format=ROW 【⽇日志中会记录成每⼀一⾏行行数据被修改的形式】
server_id=1 【指定当前机器器的服务ID(如果是集群,不不能重复)】
重启MySQL并通过命令验证
mysql> show variables like '%log_bin%';
进入指定的binlog路径查看是否生产binlog
cd /var/lib/mysql/
2、安装Maxwell
Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。
官网(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)
Maxwell主要提供了下列功能:
支持 SELECT * FROM table 的方式进行全量数据初始化 支持在主库发生failover后,自动恢复binlog位置(GTID) 可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区 工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event 除了Maxwell外,目前常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三个工具对比如下:
canal 由Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。
maxwell相对于canal的优势是使用简单,它直接将数据变更输出为json字符串,不需要再编写客户端
将压缩包上传到MySQL节点解压
tar -zxvf maxwell-1.21.1.tar.gz
3、配置maxwell,解析binlog到kafka
编写配置文件,主要用来指定解析业务库的binlog和发送数据到kafka
[aaa@qq.com maxwell-1.21.1]# vim driver.properties
######### binlog ###############
log_level=INFO
producer=kafka
host = node02
user = maxwell
password = !Qaz123456
producer_ack_timeout = 600000
######### binlog ###############
######### output format stuff ###############
output_binlog_position=ture
output_server_id=true
output_thread_id=ture
output_commit_info=true
output_row_query=true
output_ddl=false
output_nulls=true
output_xoffset=true
output_schema_id=true
######### output format stuff ###############
############ kafka stuff #############
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092,node04:9092,node05:9092,node06:9092
kafka_topic=veche
kafka_partition_hash=murmur3
kafka_key_format=hash
kafka.compression.type=snappy
kafka.retries=5
kafka.acks=all
producer_partition_by=primary_key
############ kafka stuff #############
############## misc stuff ###########
bootstrapper=async
############## misc stuff ##########
############## filter ###############
filter=exclude:*.*, include: car.order_info_201904,include: car.order_info_201905,include: car.order_info_201906,include: car.order_info_201907,include: car.order_info_201908,include: car.order_info_201906,include: car.order_info_201910,include: car.order_info_201911,include: car.order_info_201912,include: car.renter_info,include: car.driver_info ,include: car.opt_alliance_business
############## filter ###############
以上参数解释:http://maxwells-daemon.io/config/
######### binlog ###############
用于指定解析目标机器的binlog(业务库所在机器地址)
######### output format stuff ###############
配置输出格式
############ kafka stuff #############
指定kafka地址
############## misc stuff ###########
在处理bootstrap时,是否会阻塞正常的binlog解析 async不会阻塞
############## filter ###############
用于指定需要监控哪些库、哪些表
4、创建Maxwell数据库用户并赋予权限
创建Maxwell用户,并赋予 maxwell 库的一些权限
CREATE USER 'maxwell'@'%' IDENTIFIED BY '!Qaz123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
flush privileges;
5、编写Maxwell启动脚本
#! /bin/bash
nohup bin/maxwell --daemon --config driver.properties 2>&1 >> /opt/cdh/maxwell-1.21.1/logs/maxwell.log &
启动之后,通过jps -m查看进程
上一篇: C++ 使用 spdlog 操作日志
下一篇: SSM框架基础整合