欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SQLAlchemy的使用(一)--基本使用方法

程序员文章站 2024-03-24 17:37:28
...

Python SQLAlchemy Cheatsheet : https://www.pythonsheets.com/notes/python-sqlalchemy.html
官方文档:https://docs.sqlalchemy.org/en/latest/orm/tutorial.html

首先 import sqlalchemy
可以这样检查版本 sqlalchemy.__version__

连接database:

engine = create_engine('mssql+pyodbc://sqlalchemy:[email protected]/SUSDB?driver=ODBC+Driver+13+for+SQL+Server', echo=True)#echo用于显示sql执行信息,设置SQLAlchemy logging

engine代表与database连接的核心接口,可以直接使用engine.execute()或者engine.connect()来直接建立一个DBAPI的连接,但是如果我们要使用ORM, 那么我们不直接使用engine


创建一个schema:

创建表结构的方式

Base = declarative_base() #生成orm基类
class AlchemyTest(Base):
    __tablename__ = 'tbSqlAlchemyTest' #这里不用加上dbo.前缀,默认已经有了

    TestName = Column(String(256), primary_key=True)
    TestNumber = Column(INTEGER)
    
#the table object is a member of a larger collection known as MetaData
Base.metadata.create_all(engine) #通过基类与数据库进行交互创建表结构,此时表内还没有数据

还可以在创建class的末尾加上类似于:

def __repr__(self):
        return "<User(name='%s', fullname='%s', password='%s')>" % (
                                self.name, self.fullname, self.password)

这个为可选项,只是增加对于表的描述,便于以后测试


向表内加入数据:

先创建一个实例:test1 = AlchemyTest(TestName='user1', TestNumber=1995)

再创建一个会话:

from sqlalchemy.orm import sessionmaker

Session_class = sessionmaker(bind=engine) #建立与数据库的会话连接,这里建立的是一个class不是一个实例对象
session = Session_class() #这里创建一个会话实例
test1 = AlchemyTest(TestName='user1', TestNumber=1995)
session.add(test1) #把要创建的数据对象加入到这个会话中,这个时候是pending状态,添加多个对象使用add_all()
session.commit() #统一提交会话中的操作
session.close()

在提交这个会话之前就可以使用:our_user = session.query(User).filter_by(name='ed').first() 来查看刚刚创建的数据
在会话提交之前,还可以这样回滚这个会话,使之前flush的内容失效: session.rollback()
每个session会话才是真正的与数据库的连接


使用query查询表的内容:

session = Session_class()
for row in session.query(AlchemyTest).all():
    print(row.TestName, row.TestNumber)

query()返回的结果是一个tuple

可以使用label()对单独的一列进行命名
for row in session.query(User.name.label('name_label')).all()
这样就对name这一列命名为name_label

也可以用python的方法限定哪几行:
for u in session.query(User).order_by(User.id)[1:3]

使用filter来限定条件
for name, in session.query(User.name).filter(User.fullname=='Ed Jones')
filter_by()可以直接使用关键字进行限制条件,但是filter()能处理更复杂的限制条件
如果有两个限制条件是AND关系,可以直接使用两次filter()处理:
for user in session.query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones') 下面列表也展示了其他方法

filter operator :

作用 符号
相等 ==
不相等 !=
通配符搜索 like(’%关键字%’)
通配符搜索(不敏感) ilike()
存在于 .in_([‘ed’, ‘wendy’, ‘jack’])
不存在于 ~User.name.in_([‘ed’, ‘wendy’, ‘jack’])
为空 == None
AND .filter(User.name == ‘ed’, User.fullname == ‘Ed Jones’)
OR or_(User.name == ‘ed’, User.name == ‘wendy’)
MATCH .match(‘wendy’)

检索返回的列表,以及列表的标量 :

符号 作用
all() 返回所有
first() 返回第一行
one() 检查是不是只有一行结果
one_or_none() 检查是不是一行或者没有结果
scalar() invokes the one() method, and upon success returns the first column of the row

直接使用textual SQL语句 :
使用text()可以在query中使用literal strings:
session.query(User).filter(text("id<224")).order_by(text("id")).all()

如果要直接使用完整的整条sql,可以使用text()传给from_statement():
session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()

计数:
可以直接使用 .count()计算有多少行:
session.query(User).filter(User.name.like('%ed')).count()

当需要将指定的东西进行计数,就使用func.count():
session.query(func.count(User.id)).scalar()



建立表与表之间的关系(一对多,多对一):

多对一:

class Department(Base):
    __tablename__ = 'department'
    id = Column(Integer, primary_key=True)
    name = Column(String(30))

class Employee(Base):
    __tablename__ = 'employee'
    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    dep_id = Column(Integer, ForeignKey('department.id'))
    department = relationship("Department")

employee对于department是多对一的关系

一对多:

class Department(Base):
    __tablename__ = 'department'
    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    employees = relationship("Employee")

class Employee(Base):
    __tablename__ = 'employee'
    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    dep_id = Column(Integer, ForeignKey('department.id'))

department对于employee是一对多的关系

新建一个tbSqlAlchemyTestError的table 一个test可以有多个error

class AlchemyTestError(Base):
    __tablename__ = 'tbSqlAlchemyTestError'

    errorID = Column(INTEGER, primary_key=True)
    errorname = Column(String(256), nullable=False)
    TestName = Column(String(256), ForeignKey('tbSqlAlchemyTest.TestName'))#这里面关联外键使用表的名字而不是类的名字,表与表之间建立联系

    test = relationship("AlchemyTest", back_populates="error")
    #告诉ORM,自己这个class与AlchemyTest这个class相关联,并且设定了一个“多对一”的关系
    #*在自己这个类中生成一个test属性,然后在AlchemyTest类中创建一个error属性,与之关联,可以反向查询*

AlchemyTest.error = relationship("AlchemyTestError", back_populates="test")
#与上面一个relationship形成双向relationship,意思是在AlchemyTest类中生成一个error属性,然后与AlchemyTestError中的test属性相关联
#所以实际上与上面一个relationship是一样的意思

给一对多的关系表插入数据:

test1 = session.query(AlchemyTest).filter(AlchemyTest.TestName == 'user1').first() #首先提取需要加入error的test,这个已经存在
test1.error = [AlchemyTestError(errorID='553', errorname='cuowu'),
               AlchemyTestError(errorID='667', errorname='afsf')] #使用relationship()时建立的error属性来给AlchemyTestError插入数据
session.add(test1)
session.commit()stmt

使用query.join()结合两张表来查询数据表数据:

result = session.query(AlchemyTest).join(AlchemyTestError).filter(AlchemyTestError.errorname == 'cuowu').first() #查询AlchemyTest表内的数据,看哪些user有这个error
if result is not None:
    print(result.TestName)

因为这两张表只有一个外键,所以query.join()知道怎么关联这两张表,但是如果没有或者不止一个外键的时候,使用其他的形式:
result = session.query(AlchemyTest).join((AlchemyTestError, AlchemyTest.TestName==AlchemyTestError.TestName)).first()

这里有一个join的reminder:

join 如果表中有至少一个匹配,则返回行
left join 即使右表中没有匹配,也从左表返回所有的行
right join 即使左表中没有匹配,也从右表返回所有的行
full join 只要其中一个表中存在匹配,就返回行

outerjoin: query.outerjoin(User.addresses) # LEFT OUTER JOIN

使用别名:
当一个table需要被call多次的时候,使用别名aliased()可以弄清:

name1 = aliased(AlchemyTest)
name2 = aliased(AlchemyTest)

使用subquery()来做子查询:
这个例子用于查询每个AlchemyTest的TestName对应了几个error:

stmt = session.query(AlchemyTestError.TestName, func.count('*').label('errorcount')).group_by(AlchemyTestError.TestName).subquery() #一定要使用subquery()才能使这个临时表生效
#相当于生成了一个临时表,stmt它具有表结构,这个表结构的数据我们通过"c"这个属性来获取
for user, count in session.query(AlchemyTest, stmt.c.errorcount).outerjoin(stmt, AlchemyTest.TestName==stmt.c.TestName).order_by(AlchemyTest.TestName):
#这里相当于SELECT user FROM AlchemyTest, SELECT count FROM stmt.c.errorcount LEFT OUTER JOIN stmt ON (AlchemyTest.TestName==stmt.c.TestName)
    print(user.TestName, count)

使用exists():

stmt = exists().where(Address.user_id==User.id)
for name, in session.query(User.name).filter(stmt):
     print(name)

一些描述关系的方法:
query.filter(User.addresses.contains(someaddress))
query.filter(User.addresses.any(Address.email_address == 'bar'))
query.filter(Address.user.has(name='ed'))
session.query(Address).with_parent(someuser, 'addresses')

删除数据:
删除可以直接使用session.delete()
但是与之关联的table并没有删除数据,因为SQLAlchemy并没有设定级联删除!!!
解决方案:
一对多关系的table里的relationship设定时增加cascade:
cascade="all, delete, delete-orphan”



建立多对多关系的表结构:

例子中有两个tables,分别是博客表和关键字表,一篇博客可以拥有多个关键字,一个关键字也可以对应多篇博客。
所以我们要建立一个未映射的关系表:

post_keywords = Table('post_keywords', Base.metadata,
...     Column('post_id', ForeignKey('posts.id'), primary_key=True),
...     Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
... )

这个表建立的方式与建立一个拥有映射关系的class不同。接下来创建博客表和关键字表分别与这个中间关系表用relationship()关联起来:

>>> class BlogPost(Base):
...     __tablename__ = 'posts'
...
...     id = Column(Integer, primary_key=True)
...     user_id = Column(Integer, ForeignKey('users.id'))
...     headline = Column(String(255), nullable=False)
...     body = Column(Text)
...
...     # many to many BlogPost<->Keyword
...     keywords = relationship('Keyword',
...                             secondary=post_keywords,
...                             back_populates='posts')
...
...     def __init__(self, headline, body, author):
...         self.author = author
...         self.headline = headline
...         self.body = body
...
...     def __repr__(self):
...         return "BlogPost(%r, %r, %r)" % (self.headline, self.body, self.author)


>>> class Keyword(Base):
...     __tablename__ = 'keywords'
...
...     id = Column(Integer, primary_key=True)
...     keyword = Column(String(50), nullable=False, unique=True)
...     posts = relationship('BlogPost',
...                          secondary=post_keywords,
...                          back_populates='keywords')
...
...     def __init__(self, keyword):
...         self.keyword = keyword