欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

mysql-replication实时读取binlog

程序员文章站 2023-01-18 13:43:10
mysql-replication是纯Python的MySQL复制协议的实现基于PyMYSQL构建。 可以接收插入,更新,删除数据和原始SQL查询等事件。 作用: MySQL到NoSQL数据库复制MySQL到搜索引擎的复制数据库中发生更改时缓存无效审计实时分析 使用环境: MySQL 5.5,5.6 ......

mysql-replication是纯Python的MySQL复制协议的实现基于PyMYSQL构建。 可以接收插入,更新,删除数据和原始SQL查询等事件。

作用:

MySQL到NoSQL数据库复制
MySQL到搜索引擎的复制
数据库中发生更改时缓存无效
审计
实时分析

使用环境:

MySQL 5.5,5.6和5.7
Python> = 2.7
Python 3.3,3.4,3.5和3.6(不支持3.2)
PyPy(确实比标准的Python解释器更快)
该项目被用于一些中型互联网公司的关键业务的生产。 但是所有的用例在现实世界中都没有完全测试过。

先决条件:

MySQL服务器配置:
[mysqld] server-id = 1 log_bin = /var/log/mysql/mysql-bin.log expire_logs_days = 10 max_binlog_size = 100M binlog-format = row #Very important if you want to receive write, update and delete row events

文档地址:

https://github.com/noplay/python-mysql-replication

https://python-mysql-replication.readthedocs.io/en/latest/

安装:

pip install mysql-replication

使用示例:

#!/usr/bin/env python3
# _*_ coding:utf8 _*_
import json
import sys,os
import pymysqlreplication
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent,TableMapEvent)
MYSQL_SETTINGS = {"host": "192.168.1.238","port": 3306,"user": "root","passwd": "123456"}   ##配置链接地址
def main():
    stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,server_id=5,blocking=False,only_schemas = ["masterdb"],freeze_schema=True) 
    for binlogevent in stream:
        try:
            if binlogevent.schema == "masterdb":
                if isinstance(binlogevent, WriteRowsEvent):
                    print("类型:insert;库表:`%s`.`%s`;%s"%(binlogevent.schema,binlogevent.table,binlogevent.rows[0]["values"]))
                elif isinstance(binlogevent, DeleteRowsEvent):
                    if binlogevent.primary_key : ##如果有主键
                        print("类型:delete;库表:`%s`.`%s`;主键:%s;%s"%(binlogevent.schema,binlogevent.table,binlogevent.primary_key,binlogevent.rows[0]["values"]))
                elif isinstance(binlogevent, UpdateRowsEvent):
                    if binlogevent.primary_key:
                        print("类型:update;库表:`%s`.`%s`;主键:%s;%s"%(binlogevent.schema,binlogevent.table,binlogevent.primary_key,binlogevent.rows[0]))
                elif isinstance(binlogevent,TableMapEvent):
                    pass
                else:
                    binlogevent.dump()
        except Exception:
            pass
    stream.close()
if __name__ == "__main__":
    main()

 

BinLogStreamReader参数说明:
ctl_connection_settings:集群保存模式信息的连接设置
resume_stream:从位置或binlog的最新事件或旧的可用事件开始
log_file:设置复制开始日志文件
log_pos:设置复制开始日志pos(resume_stream应该为true)
auto_position:使用master_auto_position gtid设置位置
blocking:在流上读取被阻止
only_events:允许的事件数组
ignored_events:被忽略的事件数组
only_tables:包含要观看的表的数组(仅适用于binlog_format ROW)
ignored_tables:包含要跳过的表的数组
only_schemas:包含要观看的模式的数组
ignored_schemas:包含要跳过的模式的数组
freeze_schema:如果为true,则不支持ALTER TABLE。速度更快。
skip_to_timestamp:在达到指定的时间戳之前忽略所有事件。
report_slave:在SHOW SLAVE HOSTS中报告奴隶。
slave_uuid:在SHOW SLAVE HOSTS中报告slave_uuid。
fail_on_table_metadata_unavailable:如果我们无法获取有关row_events的表信息,应该引发异常
slave_heartbeat:(秒)主站应主动发送心跳连接。这也减少了复制恢复时GTID复制的流量(在许多事件在binlog中跳过的情况下)。请参阅mysql文档中的MASTER_HEARTBEAT_PERIOD以了解语义

上一篇: 男高音

下一篇: 追MM秘笈~!上