欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Python3中sqlalchemy模块学习之查询操作

程序员文章站 2024-03-04 12:21:59
...

起步

#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""表结构"""

from collections.abc import Iterable
from collections.abc import Iterator
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


engine = create_engine('mysql+pymysql://root:[email protected]:3306/sqlalchemy_db?charset=utf8mb4', max_overflow=10, echo=True)

Base = declarative_base()

# 多对一:假设多个员工可以属于一个部门,而多个部门不能有同一个员工


class Dep(Base):
    __tablename__ = 'dep'
    id = Column(Integer, primary_key=True, autoincrement=True)
    dname = Column(String(64), nullable=False, index=True)


class Emp(Base):
    __tablename__ = 'emp'
    id = Column(Integer, primary_key=True, autoincrement=True)
    ename = Column(String(32), nullable=False, index=True)
    dep_id = Column(Integer, ForeignKey(column='dep.id'))


def init_db():
    Base.metadata.create_all(bind=engine)


def drop_db():
    Base.metadata.drop_all(bind=engine)


# 想要增删改查,就得先拿连接
session = sessionmaker(bind=engine)()

query() / select_from()

# 查
# query() / select_from
# 查询全部
# 01. 相当于SELECT dep_id, dep_dname FROM dep;
res = session.query(Dep)
# res = session.query(Dep).select_from(Dep)
print('type:', type(res), res)
# <class 'sqlalchemy.orm.query.Query'>
# SELECT dep.id AS dep_id, dep.dname AS dep_dname FROM dep

print(isinstance(res, Iterable), isinstance(res, Iterator))  # True, False

for each_row in res:
     print(each_row)  # 每一行都是一个对象

# 查询指定
# 都可以
# res = session.query(Dep.id, Dep.dname)
# res = session.query(Dep.id, Dep.dname).select_from(Dep)


# for each_row in res:
#     print(each_row.id, each_row.dname)


# ==================================================================

all()

# 02. .all()返回一个列表,每一个元素都一个对象,代表一行
res = session.query(Dep).all()
print('type:', res.__class__, res)
# <class 'list'>
# [<__main__.Dep object at 0x00000010A7F67668>,
# <__main__.Dep object at 0x00000010A7F676D8>,
# <__main__.Dep object at 0x00000010A7F67748>,
# <__main__.Dep object at 0x00000010A7F677B8>]

for each_row in res:
     print(each_row.id, each_row.dname)

filter()

# 03. select dep.dname from dep where dep.id > 1;
res = session.query(Dep.dname).filter(Dep.id > 1)
for each_row in res:
     print(each_row.dname)

filter_by()

# 04. filter_by
res = session.query(Dep).filter_by(id=130)
for each_row in res:
     print(each_row.dname)

# 与filter的区别就是filter_by是以传参的形式进行的,而且只能做=操作
#     filter 是表达式,丰富一点

order_by()

# 05. select dep.id from dep order by dep.id asc;
# 05. select dep.id from dep order by dep.id desc;
# 05. select dep.id, dep.dname from dep order by dep.dname asc, dep.id desc;
res = session.query(Dep.id).order_by(Dep.id.asc())
for each_row in res:
     print(each_row.id)
#
res = session.query(Dep.id).order_by(Dep.id.desc())
for each_row in res:
     print(each_row.id)
#
res = session.query(Dep.id, Dep.dname).order_by(Dep.dname.asc(), Dep.id.desc())
for each_row in res:
     print(each_row.id, each_row.dname)

first()

# 06. select dep.id, dep.dname from dep limit 1;
res = session.query(Dep).first()
print(res.id, res.dname)

and

# 07. select * from dep where dep.id > 1 and dep.id < 100;
res = session.query(Dep).filter(Dep.id > 100, Dep.id < 150)
for each_row in res:
    print(each_row.id, each_row.dname)

整体代码

#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""表结构"""

from collections.abc import Iterable
from collections.abc import Iterator
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


engine = create_engine('mysql+pymysql://root:[email protected]:3306/sqlalchemy_db?charset=utf8mb4', max_overflow=10, echo=True)

Base = declarative_base()

# 多对一:假设多个员工可以属于一个部门,而多个部门不能有同一个员工


class Dep(Base):
    __tablename__ = 'dep'
    id = Column(Integer, primary_key=True, autoincrement=True)
    dname = Column(String(64), nullable=False, index=True)


class Emp(Base):
    __tablename__ = 'emp'
    id = Column(Integer, primary_key=True, autoincrement=True)
    ename = Column(String(32), nullable=False, index=True)
    dep_id = Column(Integer, ForeignKey(column='dep.id'))


def init_db():
    Base.metadata.create_all(bind=engine)


def drop_db():
    Base.metadata.drop_all(bind=engine)


# 想要增删改查,就得先拿连接
session = sessionmaker(bind=engine)()

# 查
# query() / select_from
# 查询全部
# 01. 相当于SELECT dep_id, dep_dname FROM dep;
# res = session.query(Dep)
# res = session.query(Dep).select_from(Dep)
# print('type:', type(res), res)
# <class 'sqlalchemy.orm.query.Query'>
# SELECT dep.id AS dep_id, dep.dname AS dep_dname FROM dep

# print(isinstance(res, Iterable), isinstance(res, Iterator))  # True, False

# for each_row in res:
#     print(each_row)  # 每一行都是一个对象

# 查询指定
# 都可以
# res = session.query(Dep.id, Dep.dname)
# res = session.query(Dep.id, Dep.dname).select_from(Dep)


# for each_row in res:
#     print(each_row.id, each_row.dname)


# ==================================================================

# 02. .all()返回一个列表,每一个元素都一个对象,代表一行
# res = session.query(Dep).all()
# print('type:', res.__class__, res)
# <class 'list'>
# [<__main__.Dep object at 0x00000010A7F67668>,
# <__main__.Dep object at 0x00000010A7F676D8>,
# <__main__.Dep object at 0x00000010A7F67748>,
# <__main__.Dep object at 0x00000010A7F677B8>]

# for each_row in res:
#     print(each_row.id, each_row.dname)

# =======================================================================

# 03. select dep.dname from dep where dep.id > 1;
# res = session.query(Dep.dname).filter(Dep.id > 1)
# for each_row in res:
#     print(each_row.dname)

# ======================================================================

# 04. filter_by
# res = session.query(Dep).filter_by(id=130)
# for each_row in res:
#     print(each_row.dname)

# 与filter的区别就是filter_by是以传参的形式进行的,而且只能做=操作
#     filter 是表达式,丰富一点

# =====================================================================

# 05. select dep.id from dep order by dep.id asc;
# 05. select dep.id from dep order by dep.id desc;
# 05. select dep.id, dep.dname from dep order by dep.dname asc, dep.id desc;
# res = session.query(Dep.id).order_by(Dep.id.asc())
# for each_row in res:
#     print(each_row.id)
#
# res = session.query(Dep.id).order_by(Dep.id.desc())
# for each_row in res:
#     print(each_row.id)
#
# res = session.query(Dep.id, Dep.dname).order_by(Dep.dname.asc(), Dep.id.desc())
# for each_row in res:
#     print(each_row.id, each_row.dname)

# =======================================================================================

# 06. select dep.id, dep.dname from dep limit 1;
# res = session.query(Dep).first()
# print(res.id, res.dname)

# ========================================================================================

# 07. select * from dep where dep.id > 1 and dep.id < 100;
# res = session.query(Dep).filter(Dep.id > 100, Dep.id < 150)
# for each_row in res:
#     print(each_row.id, each_row.dname)

# =========================================================================================

session.close()

相关标签: Python3