欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python操作mysql数据库读取一个数据库的表写入另一个数据库

程序员文章站 2024-03-21 20:12:52
...

写这个肯定是工作需要了,不啰嗦,直接说事
我现在有两台主机,一台是公司主机,一台是客户主机,要求把公司主机上的三个表同步到客户主机上的数据库
注意是同步,首先就得考虑用linux定时任务或者主从复制,主从复制因为我没有权限在主机上设置,所以只能选择通过脚本,做定时任务
涉及的三个表创建语句

# 创建表`schedule_building`
create_sql_schedule_building = """
    DROP table IF EXISTS schedule_building ;
    CREATE TABLE `schedule_building` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `uuid` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
      `proj_id` int(11) DEFAULT NULL,
      `team_id` int(11) DEFAULT NULL,
      `Building` varchar(32) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
      `status` tinyint(4) DEFAULT '1',
      `cid` int(11) DEFAULT NULL,
      `cusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '建立人',
      `ctime` datetime DEFAULT NULL,
      `uid` int(11) DEFAULT NULL,
      `uusrname` varchar(50) CHARACTER SET utf8 DEFAULT NULL COMMENT '修改人',
      `utime` datetime DEFAULT NULL,
      `random_no` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=91 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
"""

# 创建表`schedule_floor`
create_sql_schedule_floor = """
    DROP table IF EXISTS schedule_floor ;
    CREATE TABLE `schedule_floor` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `m_id` int(11) DEFAULT NULL,
      `sort` int(11) DEFAULT NULL,
      `cname` varchar(100) CHARACTER SET utf8 DEFAULT NULL,
      `ctime` datetime DEFAULT NULL,
      `cid` int(11) DEFAULT NULL,
      `utime` datetime DEFAULT NULL,
      `uid` int(11) DEFAULT NULL,
      `status` tinyint(4) DEFAULT '1',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3249 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ;
"""

# 创建表`schedule_room`
create_sql_schedule_room = """
    DROP table IF EXISTS schedule_room ;
    CREATE TABLE `schedule_room` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `m_id` int(11) DEFAULT NULL,
      `ilevel` int(11) DEFAULT NULL,
      `parent_id` int(11) DEFAULT NULL,
      `cname` varchar(50) DEFAULT NULL,
      `mark` varchar(50) DEFAULT NULL,
      `status` tinyint(4) DEFAULT '1',
      `sort` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=926 DEFAULT CHARSET=utf8;
"""

核心代码

from mysql_base import DataBaseParent_local, DataBaseParent_remote, DataBaseParent_test
import MySQLdb

db1 = DataBaseParent_local()
db2 = DataBaseParent_remote()
db3 = DataBaseParent_test()


def read(tb_name):
    sql = "SELECT * FROM {0};".format(tb_name)
    rows, length = db1.select(sql)
    data = []
    for row in rows:
        data.append(row)
    return data


def write_building():
    schedule_building = read("schedule_building")
    sql_schedule_building_2 = "delete from schedule_building ;"
    sql_schedule_building_3 = "insert into schedule_building values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);"
    db3.insert_many(sql_schedule_building_3, sql_schedule_building_2, schedule_building)


def write_floor():
    schedule_floor = read("schedule_floor")
    sql_schedule_floor_2 = "delete from schedule_floor ;"
    sql_schedule_floor_1 = "insert into schedule_floor values(%s,%s,%s,%s,%s,%s,%s,%s,%s);"
    db3.insert_many(sql_schedule_floor_1, sql_schedule_floor_2, schedule_floor)


def write_room():
    schedule_room = read("schedule_room")
    sql_schedule_room_2 = "delete from schedule_room ;"
    sql_schedule_room_1 = "insert into schedule_room values(%s,%s,%s,%s,%s,%s,%s,%s);"
    db3.insert_many(sql_schedule_room_1, sql_schedule_room_2, schedule_room)


if __name__ == '__main__':
    write_floor()
    write_building()
    write_room()

数据库共享基类

#!/usr/bin/python
# -*- coding: UTF-8 -*-
"""DB共享类库"""
# 使用此类,先实例化一个DataBaseParent_local对象,然后对象调用相应方法
# from django.db import connection

import MySQLdb
db1 = MySQLdb.connect("www.shdfshajd.cn", "db_user", "[email protected]", "xcx", charset='utf8')
db3 = MySQLdb.connect("localhost", "root", "root", "apollo", charset='utf8')

class DataBaseParent_local:
    def __init__(self):
        self.cursor = "Initial Status"
        self.cursor = db1.cursor()
        if self.cursor == "Initial Status":
            raise Exception("Can't connect to Database server!")

    # 返回元组套元组数据
    def select(self, sqlstr):
        # result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
        cur = db1.cursor()
        cur.execute(sqlstr)
        List = cur.fetchall()
        iTotal_length = len(List)
        self.description = cur.description
        cur.close()
        return List, iTotal_length

    # 返回列表套字典数据
    def select_include_name(self, sqlstr):
        # result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
        cur = db1.cursor()
        cur.execute(sqlstr)
        index = cur.description
        List = cur.fetchall()
        iTotal_length = len(List)
        result = []
        for res in List:
            row = {}
            for i in range(len(index) - 1):
                row[index[i][0]] = res[i]
            result.append(row)
        cur.close()
        return result, iTotal_length

    # 返回指定页码数据(元组套元组)
    def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
        # List: (('apollo','male','28'),('jack','male','27'))
        # iTotal_length: 查询结果元组的长度
        # select_size:分页每页显示
        # pageNo:页码
        List, iTotal_length = self.select(sqlstr)
        # 确定页码
        if iTotal_length % select_size == 0:
            iTotal_Page = iTotal_length / select_size
        else:
            iTotal_Page = iTotal_length / select_size + 1

        start, end = (pageNo - 1) * select_size, pageNo * select_size
        if end >= iTotal_length: end = iTotal_length
        if iTotal_length == 0 or start > iTotal_length or start < 0:
            return [], iTotal_length, iTotal_Page, pageNo, select_size
        # 假设有10条数据,select_size=5,对应结果如下:
        # List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
        # iTotal_length:10
        # iTotal_Page:2
        # pageNo:1
        # select_size:5
        return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size

    # 执行sql语句
    def executesql(self, sqlstr):
        cur = db1.cursor()
        r = cur.execute(sqlstr)
        db1.commit()
        cur.close()
        return r

    # 插入数据
    def insert(self, sql, param):
        cur = self.cursor
        n = cur.execute(sql, param)
        db1.commit()
        cur.close()
        return n

    def release(self):
        return 0


class DataBaseParent_test:
    def __init__(self):
        self.cursor = "Initial Status"
        self.cursor = db3.cursor()
        if self.cursor == "Initial Status":
            raise Exception("Can't connect to Database server!")

    # 返回元组套元组数据
    def select(self, sqlstr):
        # result = (('apollo', 'male', '164.jpeg'), ('apollo', 'male', ''))
        cur = db3.cursor()
        cur.execute(sqlstr)
        List = cur.fetchall()
        iTotal_length = len(List)
        self.description = cur.description
        cur.close()
        return List, iTotal_length

    # 返回列表套字典数据
    def select_include_name(self, sqlstr):
        # result = [{'name':'apollo','age':28},{'name':'jack','age':27}]
        cur = db3.cursor()
        cur.execute(sqlstr)
        index = cur.description
        List = cur.fetchall()
        iTotal_length = len(List)
        result = []
        for res in List:
            row = {}
            for i in range(len(index) - 1):
                row[index[i][0]] = res[i]
            result.append(row)
        cur.close()
        return result, iTotal_length

    # 返回指定页码数据(元组套元组)
    def select_for_grid(self, sqlstr, pageNo=1, select_size=5):
        # List: (('apollo','male','28'),('jack','male','27'))
        # iTotal_length: 查询结果元组的长度
        # select_size:分页每页显示
        # pageNo:页码
        List, iTotal_length = self.select(sqlstr)
        # 确定页码
        if iTotal_length % select_size == 0:
            iTotal_Page = iTotal_length / select_size
        else:
            iTotal_Page = iTotal_length / select_size + 1

        start, end = (pageNo - 1) * select_size, pageNo * select_size
        if end >= iTotal_length: end = iTotal_length
        if iTotal_length == 0 or start > iTotal_length or start < 0:
            return [], iTotal_length, iTotal_Page, pageNo, select_size
        # 假设有10条数据,select_size=5,对应结果如下:
        # List[start:end]:(('apollo','male','28'),('jack','male','27')) 10,2,
        # iTotal_length:10
        # iTotal_Page:2
        # pageNo:1
        # select_size:5
        return List[start:end], iTotal_length, iTotal_Page, pageNo, select_size

    # 执行sql语句
    def executesql(self, sqlstr):
        cur = db3.cursor()
        r = cur.execute(sqlstr)
        db1.commit()
        cur.close()
        return r

    # 插入数据
    def insert(self, sql, param):
        cur = self.cursor
        n = cur.execute(sql, param)
        db3.commit()
        cur.close()
        return n

    def release(self):
        return 0

    def insert_many(self, sql, sql1, args):
        cur = self.cursor
        cur.execute(sql1)
        res = cur.executemany(sql, args)
        db3.commit()
        # cur.close()
        return res