SqlAlchemy归纳总结
程序员文章站
2024-03-02 23:04:28
...
使用sqlalchemy有一段时间了,基本操作都熟悉了,所以今天把关于Sqlalchemy的使用归纳总结一下。
创表
Sqlalchemy是操作数据库的库,所以首先要创建数据库表,在这里我使用的是sqlite3。
首先在你的配置文件里面配置数据库位置
config.py
import os
# 项目根目录
basedir = os.path.abspath(os.path.dirname(__file__))
# 配置数据库路径
os.environ['DATABASE_URL] = 'sqlite:///' + os.path.join(basedir, 'database', 'test.sqlite')
接下来就是在模型类中创建表了
models.py
import os
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, schema, Boolean, Enum, DateTime, Text, ForeignKey, LargeBinary
engine = create_engine(os.environ['SQL_PATH'])
Base = declarative_base()
class User(Base):
'''用户信息管理'''
__tablename__ = 'user'
Id = Column(Integer, primary_key=True, autoincrement='auto')
username = Column(String(32), nullable=False, comment='用户名')
password = Column(String(64), nullable=False, comment='密码')
email = Column(String(100), default=None, comment='邮箱')
# 创建表
Base.metadata.create_all(engine)
创建模型后,接下来再创建一个操作数据库的api类
databaseApi.py
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
class API(object):
"""
自定义orm框架的api接口
"""
def __init__(self, engine):
# 创建与数据库的会话session class ,这里返回给session的是个class,不是实例
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
# 生成session 实例
self.session = Session()
def addone(self, obj, data):
'''添加单条数据'''
# 实例化对象
emp = obj(**data)
try:
# 添加数据
self.session.add(emp)
# 提交修改
self.session.commit()
# 返回结果
self.session.close()
return True, 'ok'
except Exception as e:
# 添加失败后回滚数据
self.session.rollback()
self.session.close()
# 返回状态和错误信息
return False, e
def addall(self, data):
'''添加多条数据, 参数data应该是一个对象列表'''
try:
self.session.add_all(**data)
self.session.commit()
self.session.close()
return True, 'ok'
except Exception as e:
self.session.rollback()
self.session.close()
return False, e
def delete(self, obj, kwargs):
'''删除指定数据,kwargs是一个删除条件的字典'''
emp = obj()
# 判断传入的删除条件是否正确
if kwargs is not None:
# 循环字典的键,即是表的属性,判断是否存在表中
for lib in kwargs.keys():
# 如果emp中存在此属性就返回None,不存在就返回notexists
resu = getattr(emp, lib, 'notexists')
if resu == 'notexists':
self.session.close()
return False, '{0} is not defind'.format(lib)
else:
self.session.close()
return False, '请指定删除条件, 如果要删除全部内容请使用deall()函数'
# 查询判断要删除的值是否存在, 如果不存在会返回None
res = self.session.query(obj).filter_by(**kwargs).all()
if res:
try:
# 删除数据
for re in res:
self.session.delete(re)
self.session.commit()
self.session.close()
return True, 'ok'
except Exception as e:
self.session.rollback()
self.session.close()
return False,e
else:
self.session.close()
return False,'notexists'
def deall(self, obj):
'''删除表中所有数据的方法'''
res = self.session.query(obj).all()
if res:
try:
for re in res:
self.session.delete(re)
self.session.commit()
self.session.close()
return True, 'ok'
except Exception as e:
self.session.rollback()
self.session.close()
return False, e
else:
self.session.close()
return False, 'table is not content'
def change(self, obj, contents, kwargs):
'''修改数据, contents是修改的列和修改后值的字典,kwargs是条件字典'''
emp = obj()
# 对传递进来的不定长参数进行判断
if kwargs is not None:
# 循环字典的键,即是表的属性,判断是否存在表中
for lib in kwargs.keys():
# 如果emp中存在此属性就返回None,不存在就返回notexists
resu = getattr(emp, lib, 'notexists')
if resu == 'notexists':
self.session.close()
return False, '{0} is not defind'.format(lib)
else:
self.session.close()
return False, '请指定修改条件, 不允许一次修改全部内容'
# 对更改数据的查询和更改进行异常判断
try:
res = self.session.query(obj).filter_by(**kwargs).all()
for re in res:
for content in contents:
# 更新对象属性内容
setattr(re, content, contents[content])
self.session.commit()
self.session.close()
return True, 'ok'
except Exception as e:
self.session.rollback()
self.session.close()
return False, e
def query(self, obj, kwargs=None, nuique=False):
'''查询指定对象, 返回列表包含的对象字典,可能是单个或多个'''
emp = obj()
res = None
# 对传递进来的不定长参数进行判断
if kwargs is not None:
# 循环字典的键,即是表的属性,判断是否存在表中
for lib in kwargs.keys():
# 如果emp中存在此属性就返回None,不存在就返回notexists
resu = getattr(emp, lib, 'notexists')
if resu == 'notexists':
return None # 查询失败就返回None
# 查询指定内容
if nuique: # 如果指定了唯一参数就只返回一条数据
res = self.session.query(obj).filter_by(**kwargs).first()
else:
res = self.session.query(obj).filter_by(**kwargs).all()
else:
# 没有指定参数就默认查询所有内容
res = self.session.query(obj).all()
self.session.close()
return res
def fuzzy_query(self, obj, kwargs, nuique=False):
'''模糊查询'''
res = None
if nuique:
res = self.session.query(obj).filter(*kwargs).first()
else:
res = self.session.query(obj).filter(*kwargs).all()
self.session.close()
return res
定义好模块和接口类后,接下来就可以在操作数据库了
views.py
from models import engine, User
from databaseApi import API
# 初始化数据库操作引擎
db = API(engine)
# 添加数据
data = {
'username' : 'zhangsan',
'password' : '123456',
'email' : '[email protected]'
}
res, e = db.addone(User, data)
# 如果添加成功res为True,e为'ok', 如果添加失败res为False,e 为报错信息下面的类同
# 完全匹配查询
res = db.query(User) # 不带任何参数查询表中的所有内容
res = db.query(User, {'name': 'zhangsan'}) # 指定姓名
res = db.query(User, {'name': 'zhangsan'}, True) # 指定返回单条数据
# 不完全匹配查询
res = db.fuzzy_query(User, {User.name.like('zhang%')}) # 查询所有以zhang开头的
res = db.fuzzy_query(User, {User.name.like('zhang%'), User.eamil != None}) # 添加条件,性zhang并且邮箱不为空的
# 删除数据
res, e = db.delete(User, {'name' : 'lisi'})
res, e = db.deall(User) # 这个方法删除表中的所有数据,慎用。
# 修改数据
# 第一个参数是修改的表,第二个参数是修改的字段和修改后的值,第三个参数是修改的条件
res, e = db.change(User, {'email': '[email protected]'}, {'name' : 'zhangsan'})
上面的接口类中基本能满足大部分操作数据库的需要了,下面在附上搜集的一些高级查询的使用。
附记(下面是原文)
SQLAlchemy 几种查询方式总结
#带条件查询
kwargs = {User.Id > 10} # 可以将查询条件封装成一个字典
kwargs = {User.Id > 10, User.name='zhangsan'} # 可以有多个条件
kwargs = [User.Id > 10, User.name='zhangsan'] # 可以将条件封装成一个列表
kwargs = {User.name.like('zhang%')} # 模糊匹配以'zhang'开头的
kwargs = {User.name != 'zhangsan'} # 取反
session.query(User).filter(*kwarg1).all() # 执行查询,返回结果
# sql过滤
session.query(User).filter("Id>id").params(id=10).all()
#关联查询
session.query(User, Address).filter(User.Id == Address.user_id).all()
session.query(User).join(User.addresses).all()
session.query(User).outerjoin(User.addresses).all()
#聚合查询
session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all()
session.query(User.name, func.sum(User.Id).label("user_id_sum")).group_by(User.name).all()
#子查询
stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
session.query(User, stmt.c.address_count).outerjoin((stmt, User.Id == stmt.c.user_id)).order_by(User.Id).all()
#exists
session.query(User).filter(exists().where(Address.user_id == User.id))
session.query(User).filter(User.addresses.any())
# 限制返回字段
session.query(User.name, User.created_at,
User.updated_at).filter_by(name="zhangsan").order_by(
User.created_at).first()
# 记录总数查询
from sqlalchemy import func
session.query(func.count(User.id))
session.query(func.count(User.id)).group_by(User.name)\
from sqlalchemy import distinct
session.query(func.count(distinct(User.name)))
对Sqlalchemy的学习就到这里啦,后续更加深入学了在更新。
@快乐是一切
上一篇: 【SSL_P1502】校门外的树