Python 封装DBUtils 和pymysql实例
程序员文章站
2022-04-27 20:44:29
...
之前一篇Python 封装DBUtils 和pymysql 中写过一个basedao.py,最近几天又重新整理了下思绪,优化了下 basedao.py,目前支持的方法还不多,后续会进行改进、添加。
View Code
主要功能:
1.查询单个对象:
所需参数:表名,过滤条件
2.查询多个对象:
所需参数:表名,过滤条件
3.按主键查询:
所需参数:表名,值
4.分页查询:
所需参数:表名,页码,每页记录数,过滤条件
具体代码如下:
1 import json, os, sys, time 2 3 import pymysql 4 from DBUtils import PooledDB 5 6 class BaseDao(object): 7 """ 8 简便的数据库操作基类 9 """ 10 __config = {} # 数据库连接配置 11 __conn = None # 数据库连接 12 __cursor = None # 数据库游标 13 __database = None # 用于临时村塾查询数据库 14 __tableName = None # 用于临时存储查询表名 15 __fields = [] # 用于临时存储查询表的字段列表 16 __primaryKey_dict = {} # 用于存储配置中的数据库中所有表的主键 17 18 def __init__(self, creator=pymysql, host="localhost", user=None, password="", database=None, port=3306, charset="utf8"): 19 if host is None: 20 raise Exception("Parameter [host] is None.") 21 if user is None: 22 raise Exception("Parameter [user] is None.") 23 if password is None: 24 raise Exception("Parameter [password] is None.") 25 if database is None: 26 raise Exception("Parameter [database] is None.") 27 if port is None: 28 raise Exception("Parameter [port] is None.") 29 self.__config = dict({ 30 "creator" : creator, "charset":charset, 31 "host":host, "port":port, 32 "user":user, "password":password, "database":database 33 }) 34 self.__conn = PooledDB.connect(**self.__config) 35 self.__cursor = self.__conn.cursor() 36 self.__database = self.__config["database"] 37 self.__init_primaryKey() 38 print(get_time(), "数据库连接初始化成功。") 39 40 def __del__(self): 41 '重写类被清除时调用的方法' 42 if self.__cursor: 43 self.__cursor.close() 44 print(get_time(), "游标关闭") 45 if self.__conn: 46 self.__conn.close() 47 print(get_time(), "连接关闭") 48 49 def select_one(self, tableName=None, filters={}): 50 ''' 51 查询单个对象 52 @tableName 表名 53 @filters 过滤条件 54 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 55 ''' 56 self.__check_params(tableName) 57 sql = self.__query_util(filters) 58 self.__cursor.execute(sql) 59 result = self.__cursor.fetchone() 60 return self.__parse_result(result) 61 62 def select_pk(self, tableName=None, primaryKey=None): 63 ''' 64 按主键查询 65 @tableName 表名 66 @primaryKey 主键值 67 ''' 68 self.__check_params(tableName) 69 filters = {} 70 filters.setdefault(str(self.__primaryKey_dict[tableName]), primaryKey) 71 sql = self.__query_util(filters) 72 self.__cursor.execute(sql) 73 result = self.__cursor.fetchone() 74 return self.__parse_result(result) 75 76 def select_all(self, tableName=None, filters={}): 77 ''' 78 查询所有 79 @tableName 表名 80 @filters 过滤条件 81 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 82 ''' 83 self.__check_params(tableName) 84 sql = self.__query_util(filters) 85 self.__cursor.execute(sql) 86 results = self.__cursor.fetchall() 87 return self.__parse_results(results) 88 89 def count(self, tableName=None): 90 ''' 91 统计记录数 92 ''' 93 self.__check_params(tableName) 94 sql = "SELECT count(*) FROM %s"%(self.__tableName) 95 self.__cursor.execute(sql) 96 result = self.__cursor.fetchone() 97 return result[0] 98 99 def select_page(self, tableName=None, pageNum=1, limit=10, filters={}):100 '''101 分页查询102 @tableName 表名103 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value104 '''105 self.__check_params(tableName)106 totalCount = self.count()107 if totalCount / limit == 0 :108 totalPage = totalCount / limit109 else:110 totalPage = totalCount // limit + 1111 if pageNum > totalPage:112 print("最大页数为%d"%totalPage)113 pageNum = totalPage114 elif pageNum < 1:115 print("页数不能小于1")116 pageNum = 1117 beginindex = (pageNum-1) * limit118 filters.setdefault("_limit_", (beginindex, limit))119 sql = self.__query_util(filters)120 self.__cursor.execute(sql)121 results = self.__cursor.fetchall()122 return self.__parse_results(results)123 124 def __parse_result(self, result):125 '用于解析单个查询结果,返回字典对象'126 obj = {}127 for k,v in zip(self.__fields, result):128 obj[k] = v129 return obj130 131 def __parse_results(self, results):132 '用于解析多个查询结果,返回字典列表对象'133 objs = []134 for result in results:135 obj = self.__parse_result(result)136 objs.append(obj)137 return objs138 139 def __init_primaryKey(self):140 '根据配置中的数据库读取该数据库中所有表的主键集合'141 sql = """SELECT TABLE_NAME, COLUMN_NAME142 FROM Information_schema.columns143 WHERE COLUMN_KEY='PRI' AND TABLE_SCHEMA='%s'"""%(self.__database)144 self.__cursor.execute(sql)145 results = self.__cursor.fetchall()146 for result in results:147 self.__primaryKey_dict[result[0]] = result[1]148 149 def __query_fields(self, tableName=None, database=None):150 '查询表的字段列表, 将查询出来的字段列表存入 __fields 中'151 sql = """SELECT column_name152 FROM Information_schema.columns153 WHERE table_Name = '%s' AND TABLE_SCHEMA='%s'"""%(tableName, database)154 self.__cursor.execute(sql)155 fields_tuple = self.__cursor.fetchall()156 self.__fields = [fields[0] for fields in fields_tuple]157 158 def __query_util(self, filters=None):159 """160 SQL 语句拼接方法161 @filters 过滤条件162 """163 sql = r'SELECT #{FIELDS} FROM #{TABLE_NAME} WHERE 1=1 #{FILTERS}'164 # 拼接查询表165 sql = sql.replace("#{TABLE_NAME}", self.__tableName)166 # 拼接查询字段167 self.__query_fields(self.__tableName, self.__database)168 FIELDS = ""169 for field in self.__fields:170 FIELDS += field + ", "171 FIELDS = FIELDS[0: len(FIELDS)-2]172 sql = sql.replace("#{FIELDS}", FIELDS)173 # 拼接查询条件(待优化)174 if filters is None:175 sql = sql.replace("#{FILTERS}", "")176 else:177 FILTERS = ""178 if not isinstance(filters, dict):179 raise Exception("Parameter [filters] must be dict type. ")180 isPage = False181 if filters.get("_limit_"):182 isPage = True183 beginindex, limit = filters.get("_limit_")184 for k, v in filters.items():185 if k.startswith("_in_"): # 拼接 in186 FILTERS += "AND %s IN (" %(k[4:])187 values = v.split(",")188 for value in values:189 FILTERS += "%s,"%value190 FILTERS = FILTERS[0:len(FILTERS)-1] + ") "191 elif k.startswith("_nein_"): # 拼接 not in192 FILTERS += "AND %s NOT IN (" %(k[4:])193 values = v.split(",")194 for value in values:195 FILTERS += "%s,"%value196 FILTERS = FILTERS[0:len(FILTERS)-1] + ") "197 elif k.startswith("_like_"): # 拼接 like198 FILTERS += "AND %s like '%%%s%%' " %(k[6:], v)199 elif k.startswith("_ne_"): # 拼接不等于200 FILTERS += "AND %s != '%s' " %(k[4:], v)201 elif k.startswith("_lt_"): # 拼接小于202 FILTERS += "AND %s < '%s' " %(k[4:], v)203 elif k.startswith("_le_"): # 拼接小于等于204 FILTERS += "AND %s <= '%s' " %(k[4:], v)205 elif k.startswith("_gt_"): # 拼接大于206 FILTERS += "AND %s > '%s' " %(k[4:], v)207 elif k.startswith("_ge_"): # 拼接大于等于208 FILTERS += "AND %s >= '%s' " %(k[4:], v)209 elif k in self.__fields: # 拼接等于210 FILTERS += "AND %s = '%s' "%(k, v)211 sql = sql.replace("#{FILTERS}", FILTERS)212 if isPage:213 sql += "LIMIT %d,%d"%(beginindex, limit)214 215 print(get_time(), sql)216 return sql217 218 def __check_params(self, tableName):219 '''220 检查参数221 '''222 if tableName:223 self.__tableName = tableName224 else:225 if self.__tableName is None:226 raise Exception("Parameter [tableName] is None.")227 228 def get_time():229 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())230 231 if __name__ == "__main__":232 config = {233 # "creator": pymysql,234 # "host" : "127.0.0.1", 235 "user" : "root", 236 "password" : "root",237 "database" : "test", 238 # "port" : 3306,239 # "charset" : 'utf8'240 }241 base = BaseDao(**config)242 ########################################################################243 # user = base.select_one("user")244 # print(user)245 ########################################################################246 # users = base.select_all("user")247 # print(users)248 ########################################################################249 # filter1 = {250 # "sex":0,251 # "_in_id":"1,2,3,4,5",252 # "_like_name":"zhang",253 # "_ne_name":"wangwu"254 # }255 # user_filters = base.select_all(tableName="user", filters=filter1)256 # print(user_filters)257 ########################################################################258 # menu = base.select_one(tableName="menu")259 # print(menu)260 ########################################################################261 # user_pk = base.select_pk("user", 2)262 # print(user_pk)263 ########################################################################264 # filter2 = {265 # "_in_id":"1,2,3,4",266 # "_like_name":"test"267 # }268 # user_limit = base.select_page("user", 2, 10, filter2) #未实现269 # print(user_limit)270 ########################################################################
代码中已经给出了几个具体示例,大家可以参考使用。
以上就是Python 封装DBUtils 和pymysql实例的详细内容,更多请关注其它相关文章!
上一篇: 详解c#类的构造方法及示例代码
下一篇: office破解激活安装