基于Python的SQL Server数据库实现对象同步轻量级
缘由
日常工作中经常遇到类似的问题:把某个服务器上的某些指定的表同步到另外一台服务器。
类似需求用ssis或者其他etl工作很容易实现,比如用ssis的话,就会会存在相当一部分反复的手工操作。
建源的数据库信息,目标的数据库信息,如果是多个表,需要一个一个地拉source和target,然后一个一个地mapping,然后运行实现数据同步。
然后很可能,这个workflow使用也就这么一次,就寿终正寝了,却一样要浪费时间去做这个etl。
快速数据同步实现
于是在想,可不可能快速实现类似需求,尽最大程度减少重复的手工操作?类似基于命令行的方式,简单快捷,不需要太多的手动操作。
于是就有了本文,基于python(目的是顺便熟悉一下python的语法),快速实现sql server的数据库之间的数据同步操作,后面又稍微扩展了一下,可以实现不同服务器的数据库之间的表结构,表对应的数据,存储过程,函数,用户自定义类型表(user define table type)的同步
目前支持在两个sql server数据源之间:每次同步一张或者多张表/存储过程,也可以同步整个数据库的所有表/存储过程(以及表/存储过程依赖的其他数据库对象)。
支持sqlserver2012以上版本
需要考虑到一些基本的校验问题:在源服务器上,需要同步的对象是否存在,或者输入的对象是否存在于源服务器的数据库里。
在目标服务器上,对于表的同步:
1,表的存在依赖于schema,需要考虑到表的schema是否存在,如果不存在先在target库上创建表对应的schema
2,target表中是否有数据?如果有数据,是否以覆盖的方式执行
对于存储过程的同步:
1,类似于表,需要考虑存储过程的schema是否存在,如果不存在先在target库上创建表对应的schema
2,类似于表,arget数据库中是否已经存在对应的存储过程,是否以覆盖的方式执行
3,存储过程可能依赖于b表,某些函数,用户自定义表变量等等,同步存储过程的时候需要先同步依赖的对象,这一点比较复杂,实现过程中遇到在很多很多的坑
可能存在对象a依赖于对象b,对象b依赖于对象c……,这里有点递归的意思
这一点导致了重构大量的代码,一开始都是直来直去的同步,无法实现这个逻辑,切实体会到代码的“单一职责”原则
参数说明
参数说明如下,大的包括四类:
1,源服务器信息 (服务器地址,实例名,数据库名称,用户名,密码),没有用户名密码的情况下,使用windows身份认证模式
2,目标服务器信息(服务器地址,实例名,数据库名称,用户名,密码),没有用户名密码的情况下,使用windows身份认证模式
3,同步的对象类型以及对象
4,同步的对象在目标服务器上存在的情况下,是否强制覆盖
其实在同步数据的时候,也可以把需要同步的行数提取出来做参数,比较简单,这里暂时没有做。
比如需要快速搭建一个测试环境,需要同步所有的表结构和每个表的一部分数据即可。
表以及数据同步
表同步的原理是,创建目标表,遍历源数据的表,生成insert into values(***),(***),(***)格式的sql,然后插入目标数据库,这里大概步骤如下:
1,表依赖于schema,所以同步表之前先同步schema
2,强制覆盖的情况下,会drop掉目标表(如果存在的话),防止目标表与源表结构不一致,非强制覆盖的情况下,如果字段不一致,则抛出异常
3,同步表结构,包括字段,索引,约束等等,但是无法支持外键,刻意去掉了外键,想想为什么?因吹斯汀。
4,需要筛选出来非计算列字段,insert语句只能是非计算列字段(又导致重构了部分代码)
5,转义处理,在拼凑sql的时候,需要进行转义处理,否则会导致sql语句错误,目前处理了字符串中的'字符,二进制字段,时间字段的转义处理(最容易发生问题的地方)
6,鉴于insert into values(***),(***),(***)
语法上允许的最大值是1000,因此每生成1000条数据,就同步一次
7,自增列的identity_insert
标识打开与关闭处理
使用如下参数,同步源数据库的三张表到目标数据库,因为这里是在本机命名实例下测试,因此实例名和端口号输入
执行同步的效果
说明:
1,如果输入obj_type="tab" 且-obj=为none的情况下,会同步源数据库中的所有表。
2,这个效率取决于机器性能和网络传输,本机测试的话,每秒中可以提交3到4次,也就是每秒钟可以提交3000~4000行左右的数据。
已知的问题:
1,当表的索引为filter index的时候,无法生成包含where条件的索引创建语句,那个看起来蛋疼的表结构导出语句,暂时没时间改它。
2,暂时不支持其他少用的类型字段,比如地理空间字段什么的。
存储过程对象的同步
存储过程同步的原理是,在源数据库上生成创建存储过程的语句,然后写入目标库,这里大概步骤如下:
1,存储过程依赖于schema,所以同步存储过程之前先同步schema(同表)
2,同步的过程会检查依赖对象,如果依赖其他对象,暂停当前对象同步,先同步依赖对象
3,重复第二步骤,直至完成
4,对于存储过程的同步,如果是强制覆盖的话,强制覆盖仅仅对存储过程自己生效(删除&重建),对依赖对象并不生效,如果依赖对象不存在,就创建,否则不做任何事情
使用如下参数,同步源数据库的两个存储过程到目标数据库,因为这里是在本机命名实例下测试,因此实例名和端口号输入
说明:测试要同步的存储过程之一为[dbo].[sp_test01],它依赖于其他两个对象:dbo.table01和dbo.fn_test01()
create proc [dbo].[sp_test01] as begin set no count on; delete from dbo.table01 where id = 1000 select dbo.fn_test01() end
而dbo.fn_test01()的如下,依赖于另外一个对象:dbo.table02
create function [dbo].[fn_test01] ( ) returns int as begin declare @count int = 0 select @count = count(1) from dbo.table02 return @count end
因此,这个测试的[dbo].[sp_test01]就依赖于其他对象,如果其依赖的对象不存在,同步的时候,仅仅同步这个存储过程本身,是没有意义的
同步某一个对象的依赖对象,使用如下sql查出来对象依赖信息,因此这里就层层深入,同步依赖对象。
这里就类似于同步a的时候,a依赖于b和c,然后停止同步a,先同步b和c,同步b或者c的时候,可能又依赖于其他对象,然后继续先同步其依赖对象。
效果如下
如果输入obj_type="sp" 且-obj=为none的情况下,会同步源数据库中的所有存储过程以及其依赖对象
已知的问题:
1,加密的存储过程或者函数是无法实现同步的,因为无法生成创建对象的脚本
1,table type的同步也是一个蛋疼的过程,目前支持,但是支持的并不好,原因是创建table type之前,先删除依赖于table type的对象,否则无法删除与创建。
特别说明
依赖对象的解决,还是比较蛋疼的
如果在默认schema为dbo的对象,在存储过程或者函数中没有写schema(参考如下修改后的sp,不写相关表的schema dbo,dbo.test01==>test01),
使用 sys.dm_sql_referenced_entities这个系统函数是无法找到其依赖的对象的,奇葩的是可以找到schema的类型,却没有返回对象本身。
这一点导致在代码中层层深入,进行了长时间的debug,完全没有想到这个函数是这个鸟样子,因为这里找到依赖对象的类型,却找不到对象本身,次奥!!!
另外一种情况就是动态sql了,无法使用 sys.dm_sql_referenced_entities这个系统函数找到其依赖的对象。
其他对象的同步
支持其他数据库对象的同步,比如function,table type等,因为可以在同步其他存储过程对象的时候附带的同步function,table type,这个与表或者存储过程类似,不做过多说明。
已知问题:
1,201906122030:经测试,目前暂时不支持sequence对象的同步。
需要改进的地方
1,代码结构优化,更加清晰和条例的结构(一开始用最直接简单粗暴的方式快速实现,后面重构了很多代码,现在自己看起来还有很多不舒服的痕迹)
2,数据同步的效率问题,对于多表的导入导出操作,依赖于单线程,多个大表导出串行的话,可能存在效率上的瓶颈,如何根据表的数据量,尽可能平均地分配多多个线程中,提升效率
3,更加友好清晰的异常提示以及日志记录,生成导出日志信息。
4,异构数据同步,mysql《==》sql server《==》oracle《==》pgsql
代码端午节写好了,这几天抽空进行了一些测试以及bug fix,应该还潜在不少未知的bug,工作量比想象中的大的多了去了。
# -*- coding: utf-8 -*- # !/usr/bin/env python3 __author__ = 'mssql123' __date__ = '2019-06-07 09:36' import os import sys import time import datetime import pymssql from decimal import decimal usage = ''' -----parameter explain----- source database parameter -s_h : soure database host ----- must require parameter -s_i : soure database instace name ----- default instance name mssql -s_d : soure database name ----- must require parameter -s_u : soure database login ----- default windows identifier -s_p : soure database login password ----- must require when s_u is not null -s_p : soure database instance port ----- default port 1433 target database parameter -t_h : target database host ----- must require parameter -t_i : target database instace name ----- default instance name mssql -t_d : target database name ----- must require parameter -t_u : target database login ----- default windows identifier -t_p : target database login password ----- must require when s_u is not null -t_p : target database instance port ----- default port 1433 sync object parameter -obj_type : table or sp or function or other databse object ----- tab or sp or fn or tp -obj : table|sp|function|type name ----- whick table or sp sync overwirte parameter -f : force overwirte target database object ----- f or n --help: help document example: python datatransfer.py -s_h=127.0.0.1 -s_p=1433 -s_i="mssql" -s_d="db01" -obj_type="tab" -obj="dbo.t1,dbo.t2" -t_h=127.0.0.1 -t_p=1433 -t_i="mssql" -t_d="db02" -f="y" python datatransfer.py -s_h=127.0.0.1 -s_p=1433 -s_i="mssql" -s_d="db01" -obj_type="sp" -obj="dbo.sp1,dbo.sp2" -t_h=127.0.0.1 -t_p=1433 -t_i="mssql" -t_d="db02" -f="y" ''' class syncdatabaseobject(object): # source databse s_h = none s_i = none s_p = none s_u = none s_p = none s_d = none # obj type s_obj_type = none # sync objects s_obj = none # target database t_h = none t_i = none t_p = none t_u = none t_p = none t_d = none f = none file_path = none def __init__(self, *args, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) # connect to sqlserver def get_connect(self, _h, _i, _p, _u, _p, _d): cursor = false try: if (_u) and (_p): conn = pymssql.connect(host=_h, server=_i, port=_p, user=_u, password=_p, database=_d) else: conn = pymssql.connect(host=_h, server=_i, port=_p, database=_d) if (conn): return conn except: raise return conn # check connection def validated_connect(self, _h, _i, _p, _u, _p, _d): if not (self.get_connect(_h, _i, _p, _u, _p, _d)): print("connect to " + str(_h) + " failed,please check you parameter") exit(0) ''' this is supposed to be a valid object name just like xxx_name,or dbo.xxx_name,or [schema].xxx_name or schema.[xxx_name] then transfer this kind of valid object name to format object name like [dbo].[xxx_name](give a default dbo schema name when no schema name) other format object name consider as unvalid,will be rasie error in process format object name 1,xxx_name ======> [dbo].[xxx_name] 2,dbo.xxx_name ======> [dbo].[xxx_name] 3,[schema].xxx_name ======> [dbo].[xxx_name] 3,schema.xxx_name ======> [schema].[xxx_name] 4,[schema].[xxx_name] ======> [schema].[xxx_name] 5,[schema].[xxx_name ======> rasie error format message ''' @staticmethod def format_object_name(name): format_name = "" if ("." in name): schema_name = name[0:name.find(".")] object_name = name[name.find(".") + 1:] if not ("[" in schema_name): schema_name = "[" + schema_name + "]" if not ("[" in object_name): object_name = "[" + object_name + "]" format_name = schema_name + "." + object_name else: if ("[" in name): format_name = "[dbo]." + name else: format_name = "[dbo]." + "[" + name + "]" return format_name ''' check user input object is a valid object ''' def exits_object(self, conn, name): conn = conn cursor_source = conn.cursor() # get object by name from source db sql_script = r'''select top 1 1 from ( select concat(quotename(schema_name(schema_id)),'.',quotename(name)) as obj_name from sys.objects union all select concat(quotename(schema_name(schema_id)),'.',quotename(name)) as obj_name from sys.types )t where obj_name = '{0}' '''.format(self.format_object_name(name)) cursor_source.execute(sql_script) result = cursor_source.fetchall() if not result: return 0 else: return 1 conn.cursor.close() conn.close() # table variable sync def sync_table_variable(self, tab_name, is_reference): conn_source = self.get_connect(self.s_h, self.s_i, self.s_p, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_p, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() if (self.exits_object(conn_source, self.format_object_name(tab_name))) > 0: pass else: print("----------------------- warning message -----------------------") print("--------warning: object " + tab_name + " not existing in source database ------------") print("----------------------- warning message -----------------------") print() return exists_in_target = 0 sql_script = r'''select top 1 1 from sys.table_types tp where is_user_defined = 1 and concat(quotename(schema_name(tp.schema_id)),'.',quotename(tp.name)) = '{0}' ''' \ .format((self.format_object_name(tab_name))) # if the table schema exists in target server,skip cursor_target.execute(sql_script) exists_in_target = cursor_target.fetchone() # weather exists in target server database if (self.f == "y"): if (is_reference != "y"): # skiped,table type can not drop when used by sp sql_script = r''' if object_id('{0}') is not null drop type {0} '''.format(self.format_object_name(tab_name)) cursor_target.execute(sql_script) conn_target.commit() else: if exists_in_target: print("----------------------- warning message -----------------------") print("the target table type " + tab_name + " exists ,skiped sync table type from source") print("----------------------- warning message -----------------------") print() return sql_script = r''' declare @sql nvarchar(max) = '' select @sql = 'create type ' + '{0}' + 'as table' + char(13) + '(' + char(13) + stuff(( select char(13) + ' , [' + c.name + '] ' + case when c.is_computed = 1 then 'as ' + object_definition(c.[object_id], c.column_id) else case when c.system_type_id != c.user_type_id then '[' + schema_name(tp.[schema_id]) + '].[' + tp.name + ']' else '[' + upper(y.name) + ']' end + case when y.name in ('varchar', 'char', 'varbinary', 'binary') then '(' + case when c.max_length = -1 then 'max' else cast(c.max_length as varchar(5)) end + ')' when y.name in ('nvarchar', 'nchar') then '(' + case when c.max_length = -1 then 'max' else cast(c.max_length / 2 as varchar(5)) end + ')' when y.name in ('datetime2', 'time2', 'datetimeoffset') then '(' + cast(c.scale as varchar(5)) + ')' when y.name = 'decimal' then '(' + cast(c.[precision] as varchar(5)) + ',' + cast(c.scale as varchar(5)) + ')' else '' end + case when c.collation_name is not null and c.system_type_id = c.user_type_id then ' collate ' + c.collation_name else '' end + case when c.is_nullable = 1 then ' null' else ' not null' end + case when c.default_object_id != 0 then ' constraint [' + object_name(c.default_object_id) + ']' + ' default ' + object_definition(c.default_object_id) else '' end end from sys.table_types tp inner join sys.columns c on c.object_id = tp.type_table_object_id inner join sys.types y on y.system_type_id = c.system_type_id where tp.is_user_defined = 1 and y.name<>'sysname' and concat(quotename(schema_name(tp.schema_id)),'.',quotename(tp.name)) = '{0}' order by c.column_id for xml path(''), type).value('.', 'nvarchar(max)'), 1, 7, ' ') + ');' select @sql as script '''.format(self.format_object_name(self.format_object_name((tab_name)))) cursor_target = conn_target.cursor() cursor_source.execute(sql_script) row = cursor_source.fetchone() try: if not exists_in_target: # execute the script on target server cursor_target.execute(str(row[0])) # drop current stored_procudre if exists conn_target.commit() print("*************table type " + self.format_object_name(tab_name) + " synced *********************") print() # give a blank row when finish except: print("----------------------- error message -----------------------") print("-----------table type " + self.format_object_name(tab_name) + " synced error ---------------") print("----------------------- error message -----------------------") print() # raise cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() # schema sync def sync_schema(self): conn_source = self.get_connect(self.s_h, self.s_i, self.s_p, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_p, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() arr_schema = [] # get all table in database when not define table name schema_result = cursor_source.execute(r''' select name from sys.schemas where schema_id>4 and schema_id<16384 ''') for row in cursor_source.fetchall(): cursor_target.execute(r''' if not exists(select * from sys.schemas where name = '{0}') begin exec('create schema [{0}]') end '''.format(str(row[0]))) conn_target.commit() cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() def sync_table_schema_byname(self, tab_name, is_reference): conn_source = self.get_connect(self.s_h, self.s_i, self.s_p, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_p, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() if (self.exits_object(conn_source, self.format_object_name(tab_name)) == 0): print("----------------------- warning message -----------------------") print("---------------warning: object " + tab_name + " not existing in source database ----------------") print("----------------------- warning message -----------------------") print() return # if exists a reference table for sp,not sync the table agagin if (self.exits_object(conn_target, self.format_object_name(tab_name)) > 0): if (self.f != "y"): print("----------------------- warning message -----------------------") print("---------------warning: object " + tab_name + " existing in target database ----------------") print("----------------------- warning message -----------------------") print() return sql_script = r''' select top 1 1 from sys.tables where type_desc = 'user_table' and concat(quotename(schema_name(schema_id)),'.',quotename(name)) = '{0}' '''.format((self.format_object_name(tab_name))) # if the table schema exists in target server,skip cursor_target.execute(sql_script) exists_in_target = cursor_target.fetchone() if exists_in_target: if (self.f == "y"): if (is_reference != "y"): cursor_target.execute("drop table {0}".format(tab_name)) else: print("----------------------- warning message -----------------------") print("the target table " + tab_name + " exists ,skiped sync table schema from source") print("----------------------- warning message -----------------------") print() return sql_script = r''' declare @object_name sysname , @object_id int select @object_name = '[' + s.name + '].[' + o.name + ']' , @object_id = o.[object_id] from sys.objects o with (nowait) join sys.schemas s with (nowait) on o.[schema_id] = s.[schema_id] where quotename(s.name) + '.' + quotename(o.name) = '{0}' and o.[type] = 'u' and o.is_ms_shipped = 0 declare @sql nvarchar(max) = '' ;with index_column as ( select ic.[object_id] , ic.index_id , ic.is_descending_key , ic.is_included_column , c.name from sys.index_columns ic with (nowait) join sys.columns c with (nowait) on ic.[object_id] = c.[object_id] and ic.column_id = c.column_id where ic.[object_id] = @object_id ), fk_columns as ( select k.constraint_object_id , cname = c.name , rcname = rc.name from sys.foreign_key_columns k with (nowait) join sys.columns rc with (nowait) on rc.[object_id] = k.referenced_object_id and rc.column_id = k.referenced_column_id join sys.columns c with (nowait) on c.[object_id] = k.parent_object_id and c.column_id = k.parent_column_id where k.parent_object_id = @object_id ) select @sql = 'create table ' + @object_name + '' + '(' + '' + stuff(( select '' + ', [' + c.name + '] ' + case when c.is_computed = 1 then 'as ' + cc.[definition] else upper(tp.name) + case when tp.name in ('varchar', 'char', 'varbinary', 'binary', 'text') then '(' + case when c.max_length = -1 then 'max' else cast(c.max_length as varchar(5)) end + ')' when tp.name in ('nvarchar', 'nchar') then '(' + case when c.max_length = -1 then 'max' else cast(c.max_length / 2 as varchar(5)) end + ')' when tp.name in ('datetime2', 'time2', 'datetimeoffset') then '(' + cast(c.scale as varchar(5)) + ')' when tp.name = 'decimal' then '(' + cast(c.[precision] as varchar(5)) + ',' + cast(c.scale as varchar(5)) + ')' else '' end + case when c.collation_name is not null then ' collate ' + c.collation_name else '' end + case when c.is_nullable = 1 then ' null' else ' not null' end + case when dc.[definition] is not null then ' default' + dc.[definition] else '' end + case when ic.is_identity = 1 then ' identity(' + cast(isnull( /*ic.seed_value*/ 1, '0') as char(1)) + ',' + cast(isnull(ic.increment_value, '1') as char(1)) + ')' else '' end end + '' from sys.columns c with (nowait) join sys.types tp with (nowait) on c.user_type_id = tp.user_type_id left join sys.computed_columns cc with (nowait) on c.[object_id] = cc.[object_id] and c.column_id = cc.column_id left join sys.default_constraints dc with (nowait) on c.default_object_id != 0 and c.[object_id] = dc.parent_object_id and c.column_id = dc.parent_column_id left join sys.identity_columns ic with (nowait) on c.is_identity = 1 and c.[object_id] = ic.[object_id] and c.column_id = ic.column_id where c.[object_id] = @object_id order by c.column_id for xml path(''), type).value('.', 'nvarchar(max)'), 1, 2, '' + ' ') + isnull((select '' + ', constraint [' + k.name + '] primary key (' + (select stuff(( select ', [' + c.name + '] ' + case when ic.is_descending_key = 1 then 'desc' else 'asc' end from sys.index_columns ic with (nowait) join sys.columns c with (nowait) on c.[object_id] = ic.[object_id] and c.column_id = ic.column_id where ic.is_included_column = 0 and ic.[object_id] = k.parent_object_id and ic.index_id = k.unique_index_id for xml path(n''), type).value('.', 'nvarchar(max)'), 1, 2, '')) + ')' + '' from sys.key_constraints k with (nowait) where k.parent_object_id = @object_id and k.[type] = 'pk'), '') + ')' + '' + isnull((select ( select '' + 'alter table ' + @object_name + ' with' + case when fk.is_not_trusted = 1 then ' nocheck' else ' check' end + ' add constraint [' + fk.name + '] foreign key(' + stuff(( select ', [' + k.cname + ']' from fk_columns k where k.constraint_object_id = fk.[object_id] and 1=2 for xml path(''), type).value('.', 'nvarchar(max)'), 1, 2, '') + ')' + ' references [' + schema_name(ro.[schema_id]) + '].[' + ro.name + '] (' + stuff(( select ', [' + k.rcname + ']' from fk_columns k where k.constraint_object_id = fk.[object_id] for xml path(''), type).value('.', 'nvarchar(max)'), 1, 2, '') + ')' + case when fk.delete_referential_action = 1 then ' on delete cascade' when fk.delete_referential_action = 2 then ' on delete set null' when fk.delete_referential_action = 3 then ' on delete set default' else '' end + case when fk.update_referential_action = 1 then ' on update cascade' when fk.update_referential_action = 2 then ' on update set null' when fk.update_referential_action = 3 then ' on update set default' else '' end + '' + 'alter table ' + @object_name + ' check constraint [' + fk.name + ']' + '' from sys.foreign_keys fk with (nowait) join sys.objects ro with (nowait) on ro.[object_id] = fk.referenced_object_id where fk.parent_object_id = @object_id for xml path(n''), type).value('.', 'nvarchar(max)')), '') + isnull(((select '' + 'create' + case when i.is_unique = 1 then ' unique' else '' end + ' nonclustered index [' + i.name + '] on ' + @object_name + ' (' + stuff(( select ', [' + c.name + ']' + case when c.is_descending_key = 1 then ' desc' else ' asc' end from index_column c where c.is_included_column = 0 and c.index_id = i.index_id for xml path(''), type).value('.', 'nvarchar(max)'), 1, 2, '') + ')' + isnull('' + 'include (' + stuff(( select ', [' + c.name + ']' from index_column c where c.is_included_column = 1 and c.index_id = i.index_id for xml path(''), type).value('.', 'nvarchar(max)'), 1, 2, '') + ')', '') + '' from sys.indexes i with (nowait) where i.[object_id] = @object_id and i.is_primary_key = 0 and i.[type] = 2 for xml path(''), type).value('.', 'nvarchar(max)') ), '') select @sql as script '''.format(self.format_object_name(tab_name)) cursor_target = conn_target.cursor() cursor_source.execute(sql_script) row = cursor_source.fetchone() if not row[0]: return try: cursor_target.execute(row[0]) # drop current table schema if exists conn_target.commit() print("*************schema " + self.format_object_name(tab_name) + " synced *************") print() # give a blank row when finish except: print("----------------------- warning message -----------------------") print("-----------schema " + self.format_object_name(tab_name) + " synced failed---------------") print("----------------------- warning message -----------------------") print() cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() def get_table_column(self, conn, tab_name): column_names = "" conn = conn cursor_source = conn.cursor() # get object by name from source db sql_script = r'''select name from sys.columns where object_id = object_id('{0}') and is_computed=0 order by object_id '''.format(self.format_object_name(tab_name)) cursor_source.execute(sql_script) result = cursor_source.fetchall() for row in result: column_names = column_names + row[0] + "," return column_names[0:len(column_names) - 1] conn.cursor.close() conn.close() def sync_table_schema(self): #default not sync by referenced other object is_reference = "n" conn_source = self.get_connect(self.s_h, self.s_i, self.s_p, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_p, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() arr_table = [] if (self.s_obj): for tab_name in self.s_obj.split(","): if (tab_name) and (self.exits_object(conn_source, tab_name)>0): self.sync_table_schema_byname(tab_name, is_reference) else: print("----------------------- warning message -----------------------") print("-----------schema " + self.format_object_name(tab_name) + " not existing in source database---------------") print("----------------------- warning message -----------------------") print() else: # sync all tables # get all table in database when not define table name sql_script = ''' select quotename(s.name)+'.'+ quotename(o.name) from sys.objects o with (nowait) join sys.schemas s with (nowait) on o.[schema_id] = s.[schema_id] where o.[type] = 'u' and o.is_ms_shipped = 0 ''' cursor_source.execute(sql_script) for row in cursor_source.fetchall(): self.sync_table_schema_byname(str(row[0]), is_reference) # sync data from soure table to target table def sync_table_data(self): conn_source = self.get_connect(self.s_h, self.s_i, self.s_p, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_p, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() arr_table = [] if (self.s_obj): arr_table = self.s_obj.split(',') for tab_name in arr_table: if (self.exits_object(conn_target, self.format_object_name(tab_name)) == 0): arr_table.remove(tab_name) print("----------------- warning message -----------------------") print("----------------- warning: table " + tab_name + " not existing in target database ---------------------") print("----------------- warning message -----------------------") else: # get all table in database when not define table name tab_result = cursor_source.execute(r''' select quotename(s.name)+'.'+ quotename(o.name) from sys.objects o with (nowait) join sys.schemas s with (nowait) on o.[schema_id] = s.[schema_id] where o.[type] = 'u' and o.is_ms_shipped = 0 ''') for row in cursor_source.fetchall(): arr_table.append(str(row[0])) insert_columns = "" insert_columns = self.get_table_column(conn_source, tab_name) for tab_name in arr_table: if (self.f != "y"): sql_script = "select top 1 {0} from {1} ".format(insert_columns, tab_name) # if exists data in target table,break cursor_target.execute(sql_script) exists = cursor_target.fetchone() if exists: print("----------------------- warning message -----------------------") print("the target table " + tab_name + " exists data,skiped sync table type from source") print("----------------------- warning message -----------------------") print() continue else: sql_script = "truncate table {0} ".format(tab_name) # if exists data in target table,break cursor_target.execute(sql_script) conn_target.commit() insert_columns = "" insert_columns = self.get_table_column(conn_source, tab_name) insert_prefix = "" # weather has identity column cursor_source.execute(r'''select 1 from sys.columns where object_id = object_id('{0}') and is_identity =1 '''.format(tab_name)) exists_identity = none exists_identity = cursor_source.fetchone() if (exists_identity): insert_prefix = "set identity_insert {0} on; ".format(tab_name) # data source insert_sql = "" values_sql = "" current_row = "" counter = 0 sql_script = r''' select {0} from {1} '''.format(insert_columns, tab_name) cursor_source.execute(sql_script) # create insert columns ''' for field in cursor_source.description: insert_columns = insert_columns + str(field[0]) + "," insert_columns = insert_columns[0:len(insert_columns) - 1] ''' insert_prefix = insert_prefix + "insert into {0} ({1}) values ".format(tab_name, insert_columns) for row in cursor_source.fetchall(): counter = counter + 1 for key in row: if (str(key) == "none"): current_row = current_row + r''' null, ''' else: if (type(key) is datetime.datetime): current_row = current_row + r''' '{0}', '''.format(str(key)[0:23]) elif (type(key) is str): # 我槽!!!,这里又有一个坑:https://blog.csdn.net/dadaowuque/article/details/81016127 current_row = current_row + r''' '{0}', '''.format( key.replace("'", "''").replace('\u0000', '').replace('\x00', '')) elif (type(key) is decimal): d = decimal(key) s = '{0:f}'.format(d) current_row = current_row + r''' '{0}', '''.format(s) elif (type(key) is bytes): # print(hex(int.from_bytes(key, 'big', signed=true) )) current_row = current_row + r''' {0}, '''.format( hex(int.from_bytes(key, 'big', signed=false))) else: current_row = current_row + r''' '{0}', '''.format(key) current_row = current_row[0:len(current_row) - 2] # remove the the last one char "," values_sql = values_sql + "(" + current_row + ")," current_row = "" # execute the one batch when if (counter == 1000): insert_sql = insert_prefix + values_sql insert_sql = insert_sql[0:len(insert_sql) - 1] # remove the the last one char "," if (exists_identity): insert_sql = insert_sql + " ;set identity_insert {0} off;".format(tab_name) try: cursor_target.execute(insert_sql) except: print( "----------------------error " + tab_name + " data synced failed-------------------------") raise conn_target.commit() insert_sql = "" values_sql = "" current_row = "" counter = 0 print(time.strftime("%y-%m-%d %h:%m:%s", time.localtime()) + "*************** " + self.format_object_name( tab_name) + " " + str(1000) + " rows synced *************") if (values_sql): insert_sql = insert_prefix + values_sql insert_sql = insert_sql[0:len(insert_sql) - 1] # remove the the last one char "," if (exists_identity): insert_sql = insert_sql + " ; set identity_insert {0} off;".format(tab_name) # execute the last batch try: cursor_target.execute(insert_sql) except: print("------------------error " + tab_name + " data synced failed------------------------") raise conn_target.commit() insert_sql = "" values_sql = "" current_row = "" print(time.strftime("%y-%m-%d %h:%m:%s", time.localtime()) + "*************** " + self.format_object_name( tab_name) + " " + str( counter) + " rows synced *************") print(time.strftime("%y-%m-%d %h:%m:%s", time.localtime()) + "----------------synced " + self.format_object_name( tab_name) + " data finished---------------") print() cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() def sync_dependent_object(self, obj_name): # 强制覆盖,不需要对依赖对象生效,如果是因为属于依赖对象而被同步的,先检查target中是否存在,如果存在就不继续同步,这里打一个标记来实现 is_refernece = "y" conn_source = self.get_connect(self.s_h, self.s_i, self.s_p, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_p, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() ''' find dependent objects if exists dependent objects,sync dependent objects objects in advance ''' sql_check_dependent = r''' select * from ( select distinct rtrim(lower(s.type)) collate chinese_prc_ci_as as obj_type, quotename(d.referenced_schema_name)+'.'+quotename(d.referenced_entity_name) collate chinese_prc_ci_as as obj from sys.dm_sql_referenced_entities('{0}','object') as d inner join sys.sysobjects s on s.id = d.referenced_id union all select distinct rtrim(lower(d.referenced_class_desc)) collate chinese_prc_ci_as as obj_type, quotename(d.referenced_schema_name)+'.'+quotename(d.referenced_entity_name) collate chinese_prc_ci_as as obj from sys.dm_sql_referenced_entities('{0}','object') as d inner join sys.types s on s.user_type_id = d.referenced_id )t '''.format(self.format_object_name(obj_name)) cursor_source.execute(sql_check_dependent) result = cursor_source.fetchall() for row in result: if row[1]: if (row[0] == "u"): if (row[1]): self.sync_table_schema_byname(row[1], is_refernece) elif (row[0] == "fn" or row[0] == "if"): if (row[1]): self.sync_procudre_by_name("f", row[1], is_refernece) elif (row[0] == "type"): if (row[1]): self.sync_table_variable(row[1], is_refernece) def sync_procudre_by_name(self, type, obj_name, is_reference): conn_source = self.get_connect(self.s_h, self.s_i, self.s_p, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_p, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() if (self.exits_object(conn_source, self.format_object_name(obj_name)) == 0): print("---------------warning message----------------") print("---------------warning: object " + obj_name + " not existing in source database ----------------") print("---------------warning message----------------") print() return if (self.exits_object(conn_target, self.format_object_name(obj_name)) > 0): if (self.f != "y"): print("---------------warning message----------------") print("---------------warning: object " + obj_name + " existing in target database ----------------") print("---------------warning message----------------") print() return ''' 本来想直接生成删除语句的: 这里有一个该死的转义,怎么都弄不好,中午先去吃饭吧, 下午回来想了一下,换一种方式,不要死磕转义问题了 sql_script = select 'if object_id('+''''+quotename(schema_name(uid))+ '' + quotename(name)+''''+') is not null ' +' drop proc '+quotename(schema_name(uid))+ '.' + quotename(name) , object_definition(id) from sys.sysobjects where xtype = 'p' and uid not in (16,19) ''' sql_script = r''' select quotename(schema_name(uid))+'.'+quotename(name), object_definition(id) from sys.sysobjects where xtype in ('p','if','fn') and uid not in (16,19) ''' if (obj_name): sql_script = sql_script + " and quotename(schema_name(uid))+ '.' + quotename(name) ='{0}' ".format( self.format_object_name(obj_name)) cursor_source.execute(sql_script) row = cursor_source.fetchone() try: if type == "f": sql_script = r''' if object_id('{0}') is not null drop function {0} '''.format(self.format_object_name(row[0])) elif type == "p": sql_script = r''' if object_id('{0}') is not null drop proc {0} '''.format(self.format_object_name(row[0])) cursor_target.execute(sql_script) # drop current stored_procudre if exists conn_target.commit() # sync dependent object if (is_reference != "n"): self.sync_dependent_object(self.format_object_name(row[0])) # sync object it self cursor_target.execute(str(row[1])) # execute create stored_procudre script conn_target.commit() print("*************sync sp: " + self.format_object_name(row[0]) + " finished *****************") print() except: print("---------------error message----------------") print("------------------ sync " + row[0] + "sp error --------------------------") print("---------------error message----------------") print() cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() def sync_procudre(self, type): is_reference = "n" conn_source = self.get_connect(self.s_h, self.s_i, self.s_p, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_p, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() if (self.s_obj): for proc_name in self.s_obj.split(","): self.sync_dependent_object(proc_name) self.sync_procudre_by_name(type, proc_name, is_reference) # sync all sp and function else: sql_script = r''' select quotename(schema_name(uid))+'.'+quotename(name), object_definition(id) from sys.sysobjects where xtype = upper('{0}') and uid not in (16,19) '''.format(type) cursor_source.execute(sql_script) for row in cursor_source.fetchall(): self.sync_dependent_object(row[0]) self.sync_procudre_by_name(type, row[0], is_reference) if __name__ == "__main__": ''' sync = syncdatabaseobject(s_h="127.0.0.1", s_i = "sql2017", s_p = 49744, s_d="db01", t_h="127.0.0.1", t_i="sql2017", t_p=49744, t_d="db02", s_obj_type = "sp", s_obj = "dbo.sp_test01", f="y") sync.sync_procudre("p") ''' p_s_h = "" p_s_i = "mssql" p_s_p = 1433 p_s_d = "" p_s_u = none p_s_p = none p_s_obj = "" p_type = "" p_t_s = "" p_t_i = "mssql" p_t_p = "1433" p_t_d = "" p_t_u = none p_t_p = none # force conver target database object,default not force cover target database object p_f = "n" # sync obj type table|sp p_obj_type = none # sync whick database object p_obj = none if len(sys.argv) == 1: print(usage) sys.exit(1) elif sys.argv[1] == '--help': print(usage) sys.exit() elif len(sys.argv) >= 2: for i in sys.argv[1:]: _argv = i.split('=') # source server name if _argv[0] == '-s_h': p_s_h = _argv[1] # source server instance name if _argv[0] == '-s_i': if (_argv[1]): p_s_i = _argv[1] # source server instance port if _argv[0] == '-s_p': if (_argv[1]): p_s_p = _argv[1] # source database name if _argv[0] == '-s_d': p_s_d = _argv[1] if _argv[0] == '-s_u': p_s_u = _argv[1] if _argv[0] == '-s_p': p_s_p = _argv[1] if _argv[0] == '-t_h': p_t_h = _argv[1] if _argv[0] == '-t_i': if (_argv[1]): p_t_i = _argv[1] if _argv[0] == '-t_p': if (_argv[1]): p_t_p = _argv[1] if _argv[0] == '-t_d': p_t_d = _argv[1] if _argv[0] == '-t_u': p_t_u = _argv[1] if _argv[0] == '-t_p': p_t_p = _argv[1] if _argv[0] == '-f': if (_argv[1]): p_f = _argv[1] # object type if _argv[0] == '-obj_type': if not (_argv[1]): print("-obj_type can not be null (-obj=tab|-obj=sp|-obj=fn|-obj=type)") exit(0) else: p_obj_type = _argv[1] # object name if _argv[0] == '-obj': if (_argv[1]): p_obj = _argv[1] # require para if p_s_h.strip() == "": print("source server host cannot be null") exit(0) if p_s_d.strip() == "": print("source server host database name cannot be null") exit(0) if p_t_h.strip() == "": print("target server host cannot be null") exit(0) if p_t_d.strip() == "": print("target server host database name cannot be null") exit(0) sync = syncdatabaseobject(s_h=p_s_h, s_i=p_s_i, s_p=p_s_p, s_d=p_s_d, s_u=p_s_u, s_p=p_s_p, s_obj=p_obj, t_h=p_t_h, t_i=p_t_i, t_p=p_t_p, t_d=p_t_d, t_u=p_t_u, t_p=p_t_p, f=p_f) sync.validated_connect(p_s_h, p_s_i, p_s_p, p_s_d, p_s_u, p_s_p) sync.validated_connect(p_t_h, p_t_i, p_t_p, p_t_d, p_t_u, p_t_p) if (p_f.upper() == "y"): confirm = input("confirm you want to overwrite the target object? ") if confirm.upper() != "y": exit(0) print("-------------------------- sync begin ----------------------------------") print() if (p_obj_type == "tab"): # sync schema sync.sync_schema() # sync table schema sync.sync_table_schema() # sync data sync.sync_table_data() elif (p_obj_type == "sp"): # sync schema sync.sync_schema() # sync sp sync.sync_procudre("p") elif (p_obj_type == "fn"): # sync schema sync.sync_schema() # sync sp sync.sync_procudre("fn") elif (p_obj_type == "tp"): # sync schema sync.sync_schema() # sync sp sync.sync_table_variable() else: print("-obj_type is not validated") print() print("-------------------------- sync finish ----------------------------------")
总结
以上所述是小编给大家介绍的基于python的sql server数据库实现对象同步轻量级,希望对大家有所帮助