欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python制作mysql数据迁移脚本

程序员文章站 2024-01-16 08:30:28
用python写了个数据迁移脚本,主要是利用从库将大的静态表导出表空间,载导入到目标实例中。 #!/usr/bin/env python3 #-*- codin...

用python写了个数据迁移脚本,主要是利用从库将大的静态表导出表空间,载导入到目标实例中。

#!/usr/bin/env python3
#-*- coding:utf8 -*-
#author:zhanbin.liu
#!!!!!db必须同版本
#python3环境  pip3 install pymysql paramiko

import os
#from pathlib import path
import sys
import pymysql
import paramiko

#每次只能迁移一个db下的表,到指定db
#grant select, create, reload, alter, lock tables on *.* to 'data_migration'@'192.168.%' identified by 'data_migration@123';
tables='sqlauto_cluster,sqlauto_user'    #以,分割的字符串,如a,b,c
tablelist = tables.split(',')
sourceip = '192.168.1.101'
sourcedatabase = '/data/mysql/3306/data'
sourcedbname = 'inception_web'
sourcedatadir = os.path.join(sourcedatabase,sourcedbname)
desip = '192.168.1.102'
desdatabase = '/data/mysql/3306/data'
desdbname = 'inception_web'
desdatadir = os.path.join(desdatabase,desdbname)

# for table in tablelist:
#   desfile = path("%s/%s.ibd" %(desdatadir,table))
#   print(desfile)
#   if desfile.is_file():
#     print("ok")
#   else:
#     print("no")

comuser = 'data_migration'
compwd = 'data_migration@123'
comport = 3306

client = paramiko.sshclient()
client.set_missing_host_key_policy(paramiko.autoaddpolicy())

def table_judge():
  print("table_judge")
  sourcetableexist = pymysql.connect(sourceip,comuser,compwd,sourcedbname,comport,charset='utf8')
  destableexist = pymysql.connect(desip,comuser,compwd,desdbname,comport,charset='utf8')
  sourcetables = []
  destables = []
  cursor_source = sourcetableexist.cursor()
  cursor_des = destableexist.cursor()

  for table in tablelist:
    #print(table)
    cursor_source.execute("select table_name from information_schema.tables where table_schema='%s' and table_name='%s';" % (sourcedbname,table))
    sourcetable_tmp = cursor_source.fetchall()
    cursor_des.execute("select table_name from information_schema.tables where table_schema='%s' and table_name='%s';" % (desdbname,table))
    destable_tmp = cursor_des.fetchall()
    #print(destable_tmp)
    if sourcetable_tmp is ():
      sourcetables.append(table)
    if destable_tmp is not ():
      destables.append(destable_tmp[0][0])
  sourcetableexist.close()
  destableexist.close()

  s=d=0
  if sourcetables != []:
    print('迁移源不存在将要迁移的表:',sourceip,sourcedbname, sourcetables,' 请检查')
    s=1
  if destables != []:
    print('目标库存在将要迁移的表:',desip,desdbname,destables,' 请移除')
    d=1
  if s == 1 or d == 1:
    sys.exit()

def data_sync():
  print('data_sync')
  db_source = pymysql.connect(sourceip,comuser,compwd,sourcedbname,comport,charset='utf8')
  db_des = pymysql.connect(desip,comuser,compwd,desdbname,comport,charset='utf8')
  cursor_db_source = db_source.cursor()
  cursor_db_des = db_des.cursor()

  for table in tablelist:
    print("正在同步表:",table)
    cursor_db_source.execute("show create table %s;" % (table))
    createtablesql = cursor_db_source.fetchall()[0][1]
    print(createtablesql)
    try:
      cursor_db_des.execute(createtablesql)
    except exception as error:
      print(error)
    cursor_db_source.execute("flush table %s with read lock;" % (table))
    cursor_db_des.execute("alter table %s discard tablespace;" % (table))

    client.connect(sourceip, 22, 'root')
    stdin1, stdout1, stderr1 = client.exec_command("scp %s %s:%s " % (sourcedatadir+"/"+table+".ibd", desip, desdatadir))
    stdin2, stdout2, stderr2 = client.exec_command("scp %s %s:%s " % (sourcedatadir+"/"+table+".cfg", desip, desdatadir))
    a_e_1 = stderr1.readlines()
    a_e_2 = stderr2.readlines()
    if a_e_1 != [] or a_e_2 != []:
      print(a_e_1,a_e_2)
      sys.exit()
    client.close()

    client.connect(desip, 22, 'root')
    stdin3, stdout3, stderr3 = client.exec_command("chown -r mysql.mysql %s*" % (desdatadir+"/"+table))
    a_e_3 = stderr3.readlines()
    if a_e_3 != []:
      print(a_e_1, a_e_2)
      sys.exit()
    client.close()
    #cursor_db_source.execute("select sleep(10);")
    cursor_db_source.execute("unlock tables;")
    cursor_db_des.execute("alter table %s import tablespace;" % (table))
    print("同步完成")

  cursor_db_source.close()
  cursor_db_des.close()

def data_checksum():
  print('data_checksum')
  db_source = pymysql.connect(sourceip,comuser,compwd,sourcedbname,comport,charset='utf8')
  db_des = pymysql.connect(desip,comuser,compwd,desdbname,comport,charset='utf8')
  cursor_db_source = db_source.cursor()
  cursor_db_des = db_des.cursor()

  for table in tablelist:
    print("正在校验表:", table)
    cursor_db_source.execute("checksum table %s;" % (table))
    ck_s = cursor_db_source.fetchall()[0][1]
    cursor_db_des.execute("checksum table %s;" % (table))
    ck_d = cursor_db_des.fetchall()[0][1]
    if ck_s != ck_d:
      print("表不一致:",table)
    else:
      print("表一致:",table)

  cursor_db_source.close()
  cursor_db_des.close()

if __name__ == "__main__":
  table_judge()
  data_sync()
  data_checksum()
  print('haha')