SQLAlchemy查询
程序员文章站
2022-03-11 19:02:23
...
目录
据表结构及导入模块
from app import db
from sqlalchemy import Column
from sqlalchemy import and_, or_
from sqlalchemy import asc, desc
from sqlalchemy import exists
from sqlalchemy import func
class Department(db.Model):
"""用户管理"""
__tablename__ = 'department_info'
id = Column(db.Integer, nullable=False, primary_key=True, autoincrement=True)
departmentName = Column(db.String(25), nullable=False, comment="部门名称")
parentid = Column(db.String(25), nullable=True, comment="")
departmentId = Column(db.String(25), nullable=False, comment="部门id")
createTime = Column(db.DATETIME, nullable=False, comment="创建时间")
updateTime = Column(db.DATETIME, nullable=False, comment="修改时间")
class UserInfo(db.Model):
"""用户管理"""
__tablename__ = 'user_info'
id = Column(db.Integer, nullable=False, primary_key=True, autoincrement=True)
name = Column(db.String(25), nullable=False, comment="姓名")
mobile = Column(db.String(25), nullable=False, comment="手机号")
department = Column(db.String(25), nullable=True, comment="部门")
createTime = Column(db.DATETIME, nullable=False, comment="创建时间")
updateTime = Column(db.DATETIME, nullable=False, comment="修改时间")
def __init__(self):
pass
def to_dict(self):
return {c.name: getattr(self, c.name, None) for c in self.__table__.columns}
@staticmethod
def create():
return
@staticmethod
def update():
return
@staticmethod
def list():
return
原生查询
sql_origin = 'select * from `user_info` where id <=30;'
res_origin = db.session.execute(sql_origin)
print(res_origin.fetchone())
print(res_origin.fetchmany())
print(res_origin.fetchall())
ORM查询
# get():通过主键查询
res_primary_key = db.session.query(UserInfo).get(141) # get()返回唯一一条记录
print('res_primary_key: ', res_primary_key.to_dict())
# filter():指定字段查询
res_custom_filed = db.session.query(UserInfo).filter(UserInfo.name == "自动化测试").first() # first()返回第一条记录, 无结果时返回None
print('res_custom_filed: ', res_custom_filed.to_dict())
# filter_by()查询方法
res_filter_by = db.session.query(UserInfo).filter_by(department='4d9f58e75204441c9744aa95e2b7fa88', name='张三').all()
print('res_filter_by: ', res_filter_by)
注:filter()与filter_by()的区别
模块 语法 >、<查询 and_和or_查询 filter()
用类名.属性名, 比较用==
支持 支持 filter_by()
直接用属性名, 比较用=
不支持 不支持
查询结果处理
# one():只获取唯一一条记录, 找不到记录或找到多条记录时会报错
res_one = db.session.query(UserInfo).filter(UserInfo.id == 141).one()
print('res_one: ', res_one)
# one_or_none():只获取唯一一条记录, 找不到记录或找到多条记录时会报错
res_one_or_none = db.session.query(UserInfo).filter(UserInfo.id == 141).one_or_none()
print('res_one_or_none: ', res_one_or_none)
# scalar():同one, 没有结果时返回None
res_scalar = db.session.query(UserInfo).filter(UserInfo.id == 141).scalar()
print('res_scalar: ', res_scalar)
# count():返回记录条数
res_count = db.session.query(UserInfo).filter(UserInfo.id < 141).count()
print('res_count: ', res_count)
# limit():限制返回条数
res_limit = db.session.query(UserInfo).filter(UserInfo.id < 141).limit(10).all()
print('res_limit: ', res_limit)
# distinct():过滤掉多余的重复记录只保留一条
res_distinct = db.session.query(UserInfo.department).distinct(UserInfo.department).count()
print('res_distinct: ', res_distinct)
# order_by():查询结果排序
res_order_by_desc = db.session.query(UserInfo).order_by(UserInfo.id.desc()).all() # 倒序
print('res_order_by_desc: ', res_order_by_desc)
res_order_by_desc_1 = db.session.query(UserInfo).order_by(desc(UserInfo.id)).all() # 使用desc
print('res_order_by_desc_1: ', res_order_by_desc_1)
res_order_by_asc = db.session.query(UserInfo).order_by(UserInfo.id.asc()).all() # 正序
print('res_order_by_asc: ', res_order_by_asc)
res_order_by_asc_1 = db.session.query(UserInfo).order_by(asc(UserInfo.id)).all() # 使用asc
print('res_order_by_asc_1: ', res_order_by_asc_1)
查询条件
# exist()
res_exists = db.session.query(exists().where(UserInfo.id < 100)).scalar() # 查看记录是否存在
print('res_exists: ', res_exists)
# 与
res_and = db.session.query(UserInfo).filter(UserInfo.id < 2000, UserInfo.department == '4d9f58e75204441c9744aa95e2b7fa88').all() # all()返回所有记录
print('res_and: ', res_and)
res_and_ = db.session.query(UserInfo).filter(and_(UserInfo.id > 2000, UserInfo.department == '4d9f58e75204441c9744aa95e2b7fa88')).all() # and_实现And查询
print('res_and_: ', res_and_)
print('to_dict: ', [res.to_dict() for res in res_and_]) # 一行代码实现: 返回数据格式化为易读格式
# 或
res_or = db.session.query(UserInfo).filter(or_(UserInfo.id == 141, UserInfo.name=='李四')).all()
print('res_or: ', res_or)
# AND与OR并存
res_and_or = db.session.query(UserInfo).filter(or_(UserInfo.id < 1500, UserInfo.id > 2000), UserInfo.department == '4d9f58e75204441c9744aa95e2b7fa88').all()
print('res_and_or: ', res_and_or)
res_or_and = db.session.query(UserInfo).filter(or_(UserInfo.department == '4d9f58e75204441c9744aa95e2b7fa88', and_(UserInfo.id < 200, UserInfo.name.like('李%')))).all()
print('res_or_and', res_or_and)
# in
res_in = db.session.query(UserInfo).filter(UserInfo.id.in_([10, 141, 1688])).all()
print('res_in: ', res_in)
# like
res_like = db.session.query(UserInfo).filter(UserInfo.name.like('李%')).all()
print('res_like: ', res_like)
# contains
res_contains = db.session.query(UserInfo).filter(UserInfo.name.contains('李')).all()
print('res_contains: ', res_contains)
# NOT
res_not = db.session.query(UserInfo).filter(~UserInfo.id.in_([11, 12])).all()
print('res_not: ', res_not)
# 查询指定列
res_custom_column = db.session.query(UserInfo.id, UserInfo.name, UserInfo.mobile).filter(UserInfo.id < 15).all()
print('res_custom_column: ', res_custom_column)
# 使用列表给filter方法传参
conditions = list()
conditions.append(UserInfo.name.like('李%'))
conditions.append(UserInfo.id < 200)
res_conditions = db.session.query(UserInfo).filter(*conditions).all()
print('res_condition: ', res_conditions)
# 使用字典给filter_by方法传参
condition_dict = dict()
condition_dict['name'] = '李四'
condition_dict['id'] = 1755
res_condition_dict = db.session.query(UserInfo).filter_by(**condition_dict).all()
print('res_condition_dict: ', res_condition_dict)
func
# find_in_set(str1, str2)
res_find_in_set = db.session.query(UserInfo).filter(func.find_in_set('李四', UserInfo.name)).all() # 返回str2中str1所在的位置索引
print('res_find_in_set: ', res_find_in_set)
# count
res_func_count = db.session.query(UserInfo.department, func.count(UserInfo.id)).group_by(UserInfo.department).all()
print('res_func_count: ', res_func_count)
# sum
res_func_sum = db.session.query(UserInfo.department, func.sum(UserInfo.id)).group_by(UserInfo.department).all()
print('res_func_sum: ', res_func_sum)
# max
res_func_max = db.session.query(UserInfo.department, func.max(UserInfo.id)).group_by(UserInfo.department).all()
print('res_func_max: ', res_func_max)
# min
res_func_min = db.session.query(UserInfo.department, func.min(UserInfo.id)).group_by(UserInfo.department).all()
print('res_func_min: ', res_func_min)
关联查询
# 关联
res_relation = db.session.query(UserInfo.name, Department.departmentName).filter(UserInfo.department==Department.departmentId).all()
print('res_relation: ', res_relation)
# 内连接
res_join = db.session.query(UserInfo).join(Department, UserInfo.department==Department.departmentId).all()
print('res_join: ', res_join)
# 外连接
res_outer_join = db.session.query(UserInfo).outerjoin(Department, UserInfo.department==Department.departmentId).all()
print('res_outer_join: ', res_outer_join)
注:SQL提供了多种类型的连接方式,它们之间的区别在于:从相互交叠的不同数据集合中选择用于连接的行时所采用的方法不同。
- 内连接:只连接匹配的行
- 左外连接:包含左边表的全部行(不管右边的表中是否存在与它们匹配的行),以及右边表中全部匹配的行,a.order_id = b.order_id(+)
- 右外连接:包含右边表的全部行(不管左边的表中是否存在与它们匹配的行),以及左边表中全部匹配的行,a.order_id(+) = b.order_id
- 全外连接:包含左、右两个表的全部行,不管另外一边的表中是否存在与它们匹配的行。
上一篇: scala--函数组合子