欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SqlAlchemy归纳总结

程序员文章站 2024-03-02 23:04:28
...

使用sqlalchemy有一段时间了,基本操作都熟悉了,所以今天把关于Sqlalchemy的使用归纳总结一下。

创表

Sqlalchemy是操作数据库的库,所以首先要创建数据库表,在这里我使用的是sqlite3。
首先在你的配置文件里面配置数据库位置
config.py

import os
# 项目根目录
basedir = os.path.abspath(os.path.dirname(__file__))
# 配置数据库路径
os.environ['DATABASE_URL] = 'sqlite:///' + os.path.join(basedir, 'database', 'test.sqlite')

接下来就是在模型类中创建表了
models.py

import os
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, schema, Boolean, Enum, DateTime, Text, ForeignKey, LargeBinary

engine = create_engine(os.environ['SQL_PATH'])

Base = declarative_base()

class User(Base):
    '''用户信息管理'''
    __tablename__ = 'user'

    Id = Column(Integer, primary_key=True, autoincrement='auto')
    username = Column(String(32), nullable=False, comment='用户名')
    password = Column(String(64), nullable=False, comment='密码')
    email = Column(String(100), default=None, comment='邮箱')
 
 # 创建表
Base.metadata.create_all(engine)

创建模型后,接下来再创建一个操作数据库的api类
databaseApi.py

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

class API(object):
    """
    自定义orm框架的api接口
    """

    def __init__(self, engine):
        # 创建与数据库的会话session class ,这里返回给session的是个class,不是实例
        session_factory  = sessionmaker(bind=engine)
        Session = scoped_session(session_factory)
        # 生成session 实例
        self.session = Session()

    def addone(self, obj, data):
        '''添加单条数据'''
        # 实例化对象
        emp = obj(**data)
        try:
            # 添加数据
            self.session.add(emp)
            # 提交修改
            self.session.commit()
            # 返回结果
            self.session.close()
            return True, 'ok'
        except Exception as e:
            # 添加失败后回滚数据
            self.session.rollback()
            self.session.close()
            # 返回状态和错误信息
            return False, e
    def addall(self, data):
        '''添加多条数据, 参数data应该是一个对象列表'''

        try:
            self.session.add_all(**data)
            self.session.commit()
            self.session.close()
            return True, 'ok'
        except Exception as e:
            self.session.rollback()
            self.session.close()
            return False, e
    def delete(self, obj, kwargs):
        '''删除指定数据,kwargs是一个删除条件的字典'''
        emp = obj()
        # 判断传入的删除条件是否正确
        if kwargs is not None:
            # 循环字典的键,即是表的属性,判断是否存在表中
            for lib in kwargs.keys():
                # 如果emp中存在此属性就返回None,不存在就返回notexists
                resu = getattr(emp, lib, 'notexists')
                if resu == 'notexists':
                    self.session.close()
                    return False, '{0} is not defind'.format(lib)
        else:
            self.session.close()
            return False, '请指定删除条件, 如果要删除全部内容请使用deall()函数'
        # 查询判断要删除的值是否存在, 如果不存在会返回None
        res = self.session.query(obj).filter_by(**kwargs).all()
        if res:
            try:
                # 删除数据
                for re in res:
                    self.session.delete(re)
                    self.session.commit()
                self.session.close()
                return True, 'ok'
            except Exception as e:
                self.session.rollback()
                self.session.close()
                return False,e
        else:
            self.session.close()
            return False,'notexists'
    def deall(self, obj):
        '''删除表中所有数据的方法'''
        res = self.session.query(obj).all()
        if res:
            try:
                for re in res:
                    self.session.delete(re)
                    self.session.commit()
                self.session.close()
                return True, 'ok'
            except Exception as e:
                self.session.rollback()
                self.session.close()
                return False, e
        else:
            self.session.close()
            return False, 'table is not content'
    def change(self, obj, contents, kwargs):
        '''修改数据, contents是修改的列和修改后值的字典,kwargs是条件字典'''
        emp = obj()
        # 对传递进来的不定长参数进行判断
        if kwargs is not None:
            # 循环字典的键,即是表的属性,判断是否存在表中
            for lib in kwargs.keys():
                # 如果emp中存在此属性就返回None,不存在就返回notexists
                resu = getattr(emp, lib, 'notexists')
                if resu == 'notexists':
                    self.session.close()
                    return False, '{0} is not defind'.format(lib)
        else:
            self.session.close()
            return False, '请指定修改条件, 不允许一次修改全部内容'
        # 对更改数据的查询和更改进行异常判断
        try:
            res = self.session.query(obj).filter_by(**kwargs).all()
            for re in res:
                for content in contents:
                    # 更新对象属性内容
                    setattr(re, content, contents[content])
            self.session.commit()
            self.session.close()
            return True, 'ok'
        except Exception as e:
            self.session.rollback()
            self.session.close()
            return False, e


    def query(self, obj, kwargs=None, nuique=False):
        '''查询指定对象, 返回列表包含的对象字典,可能是单个或多个'''
        emp = obj()
        res = None
         # 对传递进来的不定长参数进行判断
        if kwargs is not None:
            # 循环字典的键,即是表的属性,判断是否存在表中
            for lib in kwargs.keys():
                # 如果emp中存在此属性就返回None,不存在就返回notexists
                resu = getattr(emp, lib, 'notexists')
                if resu == 'notexists':
                    return None # 查询失败就返回None
            # 查询指定内容
            if nuique: # 如果指定了唯一参数就只返回一条数据
                res = self.session.query(obj).filter_by(**kwargs).first()
            else:
                res = self.session.query(obj).filter_by(**kwargs).all()
        else:
            # 没有指定参数就默认查询所有内容
            res = self.session.query(obj).all()
        self.session.close()
        return res

    def fuzzy_query(self, obj, kwargs, nuique=False):
        '''模糊查询'''
        res = None
        if nuique:
            res = self.session.query(obj).filter(*kwargs).first()
        else:
            res = self.session.query(obj).filter(*kwargs).all()
        self.session.close()
        return res

定义好模块和接口类后,接下来就可以在操作数据库了
views.py

from models import engine, User
from databaseApi import API

# 初始化数据库操作引擎
db = API(engine)

# 添加数据
data = {
	'username' : 'zhangsan',
	'password' : '123456', 
	'email' : '[email protected]'
}
res, e = db.addone(User, data)
# 如果添加成功res为True,e为'ok', 如果添加失败res为False,e 为报错信息下面的类同
# 完全匹配查询
res = db.query(User) # 不带任何参数查询表中的所有内容
res = db.query(User, {'name': 'zhangsan'}) # 指定姓名
res = db.query(User, {'name': 'zhangsan'}, True) # 指定返回单条数据
# 不完全匹配查询
res = db.fuzzy_query(User, {User.name.like('zhang%')}) # 查询所有以zhang开头的
res = db.fuzzy_query(User, {User.name.like('zhang%'), User.eamil != None}) # 添加条件,性zhang并且邮箱不为空的

# 删除数据
res, e = db.delete(User, {'name' : 'lisi'})
res, e = db.deall(User) # 这个方法删除表中的所有数据,慎用。

# 修改数据
# 第一个参数是修改的表,第二个参数是修改的字段和修改后的值,第三个参数是修改的条件
res, e = db.change(User, {'email': '[email protected]'}, {'name' : 'zhangsan'})

上面的接口类中基本能满足大部分操作数据库的需要了,下面在附上搜集的一些高级查询的使用。
附记(下面是原文)
SQLAlchemy 几种查询方式总结

#带条件查询
kwargs = {User.Id > 10} # 可以将查询条件封装成一个字典
kwargs = {User.Id > 10, User.name='zhangsan'}  # 可以有多个条件
kwargs = [User.Id > 10, User.name='zhangsan'] # 可以将条件封装成一个列表
kwargs = {User.name.like('zhang%')} # 模糊匹配以'zhang'开头的
kwargs = {User.name != 'zhangsan'} # 取反
session.query(User).filter(*kwarg1).all() # 执行查询,返回结果

# sql过滤
session.query(User).filter("Id>id").params(id=10).all()
#关联查询 
session.query(User, Address).filter(User.Id == Address.user_id).all()
session.query(User).join(User.addresses).all()
session.query(User).outerjoin(User.addresses).all()
#聚合查询
session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all()
session.query(User.name, func.sum(User.Id).label("user_id_sum")).group_by(User.name).all()
#子查询
stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()
session.query(User, stmt.c.address_count).outerjoin((stmt, User.Id == stmt.c.user_id)).order_by(User.Id).all()
#exists
session.query(User).filter(exists().where(Address.user_id == User.id))
session.query(User).filter(User.addresses.any())
# 限制返回字段
session.query(User.name, User.created_at,                     
             User.updated_at).filter_by(name="zhangsan").order_by(            
             User.created_at).first()
            
# 记录总数查询
from sqlalchemy import func

session.query(func.count(User.id))
session.query(func.count(User.id)).group_by(User.name)\

from sqlalchemy import distinct

session.query(func.count(distinct(User.name)))

对Sqlalchemy的学习就到这里啦,后续更加深入学了在更新。
@快乐是一切

相关标签: sqlalchemy