SQLAlchemy的使用(一)--基本使用方法
Python SQLAlchemy Cheatsheet : https://www.pythonsheets.com/notes/python-sqlalchemy.html
官方文档:https://docs.sqlalchemy.org/en/latest/orm/tutorial.html
首先 import sqlalchemy
可以这样检查版本 sqlalchemy.__version__
连接database:
engine = create_engine('mssql+pyodbc://sqlalchemy:[email protected]/SUSDB?driver=ODBC+Driver+13+for+SQL+Server', echo=True)#echo用于显示sql执行信息,设置SQLAlchemy logging
engine代表与database连接的核心接口,可以直接使用engine.execute()或者engine.connect()来直接建立一个DBAPI的连接,但是如果我们要使用ORM, 那么我们不直接使用engine
创建一个schema:
创建表结构的方式
Base = declarative_base() #生成orm基类
class AlchemyTest(Base):
__tablename__ = 'tbSqlAlchemyTest' #这里不用加上dbo.前缀,默认已经有了
TestName = Column(String(256), primary_key=True)
TestNumber = Column(INTEGER)
#the table object is a member of a larger collection known as MetaData
Base.metadata.create_all(engine) #通过基类与数据库进行交互创建表结构,此时表内还没有数据
还可以在创建class的末尾加上类似于:
def __repr__(self):
return "<User(name='%s', fullname='%s', password='%s')>" % (
self.name, self.fullname, self.password)
这个为可选项,只是增加对于表的描述,便于以后测试
向表内加入数据:
先创建一个实例:test1 = AlchemyTest(TestName='user1', TestNumber=1995)
再创建一个会话:
from sqlalchemy.orm import sessionmaker
Session_class = sessionmaker(bind=engine) #建立与数据库的会话连接,这里建立的是一个class不是一个实例对象
session = Session_class() #这里创建一个会话实例
test1 = AlchemyTest(TestName='user1', TestNumber=1995)
session.add(test1) #把要创建的数据对象加入到这个会话中,这个时候是pending状态,添加多个对象使用add_all()
session.commit() #统一提交会话中的操作
session.close()
在提交这个会话之前就可以使用:our_user = session.query(User).filter_by(name='ed').first()
来查看刚刚创建的数据
在会话提交之前,还可以这样回滚这个会话,使之前flush的内容失效: session.rollback()
每个session会话才是真正的与数据库的连接
使用query查询表的内容:
session = Session_class()
for row in session.query(AlchemyTest).all():
print(row.TestName, row.TestNumber)
query()返回的结果是一个tuple
可以使用label()对单独的一列进行命名for row in session.query(User.name.label('name_label')).all()
这样就对name这一列命名为name_label
也可以用python的方法限定哪几行:for u in session.query(User).order_by(User.id)[1:3]
使用filter来限定条件:for name, in session.query(User.name).filter(User.fullname=='Ed Jones')
filter_by()可以直接使用关键字进行限制条件,但是filter()能处理更复杂的限制条件
如果有两个限制条件是AND关系,可以直接使用两次filter()处理:for user in session.query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones')
下面列表也展示了其他方法
filter operator :
作用 | 符号 |
---|---|
相等 | == |
不相等 | != |
通配符搜索 | like(’%关键字%’) |
通配符搜索(不敏感) | ilike() |
存在于 | .in_([‘ed’, ‘wendy’, ‘jack’]) |
不存在于 | ~User.name.in_([‘ed’, ‘wendy’, ‘jack’]) |
为空 | == None |
AND | .filter(User.name == ‘ed’, User.fullname == ‘Ed Jones’) |
OR | or_(User.name == ‘ed’, User.name == ‘wendy’) |
MATCH | .match(‘wendy’) |
检索返回的列表,以及列表的标量 :
符号 | 作用 |
---|---|
all() | 返回所有 |
first() | 返回第一行 |
one() | 检查是不是只有一行结果 |
one_or_none() | 检查是不是一行或者没有结果 |
scalar() | invokes the one() method, and upon success returns the first column of the row |
直接使用textual SQL语句 :
使用text()可以在query中使用literal strings:session.query(User).filter(text("id<224")).order_by(text("id")).all()
如果要直接使用完整的整条sql,可以使用text()传给from_statement():session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
计数:
可以直接使用 .count()计算有多少行:session.query(User).filter(User.name.like('%ed')).count()
当需要将指定的东西进行计数,就使用func.count():session.query(func.count(User.id)).scalar()
建立表与表之间的关系(一对多,多对一):
多对一:
class Department(Base):
__tablename__ = 'department'
id = Column(Integer, primary_key=True)
name = Column(String(30))
class Employee(Base):
__tablename__ = 'employee'
id = Column(Integer, primary_key=True)
name = Column(String(30))
dep_id = Column(Integer, ForeignKey('department.id'))
department = relationship("Department")
employee对于department是多对一的关系
一对多:
class Department(Base):
__tablename__ = 'department'
id = Column(Integer, primary_key=True)
name = Column(String(30))
employees = relationship("Employee")
class Employee(Base):
__tablename__ = 'employee'
id = Column(Integer, primary_key=True)
name = Column(String(30))
dep_id = Column(Integer, ForeignKey('department.id'))
department对于employee是一对多的关系
新建一个tbSqlAlchemyTestError的table 一个test可以有多个error
class AlchemyTestError(Base):
__tablename__ = 'tbSqlAlchemyTestError'
errorID = Column(INTEGER, primary_key=True)
errorname = Column(String(256), nullable=False)
TestName = Column(String(256), ForeignKey('tbSqlAlchemyTest.TestName'))#这里面关联外键使用表的名字而不是类的名字,表与表之间建立联系
test = relationship("AlchemyTest", back_populates="error")
#告诉ORM,自己这个class与AlchemyTest这个class相关联,并且设定了一个“多对一”的关系
#*在自己这个类中生成一个test属性,然后在AlchemyTest类中创建一个error属性,与之关联,可以反向查询*
AlchemyTest.error = relationship("AlchemyTestError", back_populates="test")
#与上面一个relationship形成双向relationship,意思是在AlchemyTest类中生成一个error属性,然后与AlchemyTestError中的test属性相关联
#所以实际上与上面一个relationship是一样的意思
给一对多的关系表插入数据:
test1 = session.query(AlchemyTest).filter(AlchemyTest.TestName == 'user1').first() #首先提取需要加入error的test,这个已经存在
test1.error = [AlchemyTestError(errorID='553', errorname='cuowu'),
AlchemyTestError(errorID='667', errorname='afsf')] #使用relationship()时建立的error属性来给AlchemyTestError插入数据
session.add(test1)
session.commit()stmt
使用query.join()结合两张表来查询数据表数据:
result = session.query(AlchemyTest).join(AlchemyTestError).filter(AlchemyTestError.errorname == 'cuowu').first() #查询AlchemyTest表内的数据,看哪些user有这个error
if result is not None:
print(result.TestName)
因为这两张表只有一个外键,所以query.join()知道怎么关联这两张表,但是如果没有或者不止一个外键的时候,使用其他的形式:result = session.query(AlchemyTest).join((AlchemyTestError, AlchemyTest.TestName==AlchemyTestError.TestName)).first()
这里有一个join的reminder:
join | 如果表中有至少一个匹配,则返回行 |
left join | 即使右表中没有匹配,也从左表返回所有的行 |
right join | 即使左表中没有匹配,也从右表返回所有的行 |
full join | 只要其中一个表中存在匹配,就返回行 |
outerjoin: query.outerjoin(User.addresses) # LEFT OUTER JOIN
使用别名:
当一个table需要被call多次的时候,使用别名aliased()可以弄清:
name1 = aliased(AlchemyTest)
name2 = aliased(AlchemyTest)
使用subquery()来做子查询:
这个例子用于查询每个AlchemyTest的TestName对应了几个error:
stmt = session.query(AlchemyTestError.TestName, func.count('*').label('errorcount')).group_by(AlchemyTestError.TestName).subquery() #一定要使用subquery()才能使这个临时表生效
#相当于生成了一个临时表,stmt它具有表结构,这个表结构的数据我们通过"c"这个属性来获取
for user, count in session.query(AlchemyTest, stmt.c.errorcount).outerjoin(stmt, AlchemyTest.TestName==stmt.c.TestName).order_by(AlchemyTest.TestName):
#这里相当于SELECT user FROM AlchemyTest, SELECT count FROM stmt.c.errorcount LEFT OUTER JOIN stmt ON (AlchemyTest.TestName==stmt.c.TestName)
print(user.TestName, count)
使用exists():
stmt = exists().where(Address.user_id==User.id)
for name, in session.query(User.name).filter(stmt):
print(name)
一些描述关系的方法:query.filter(User.addresses.contains(someaddress))
query.filter(User.addresses.any(Address.email_address == 'bar'))
query.filter(Address.user.has(name='ed'))
session.query(Address).with_parent(someuser, 'addresses')
删除数据:
删除可以直接使用session.delete()
但是与之关联的table并没有删除数据,因为SQLAlchemy并没有设定级联删除!!!
解决方案:
在一对多关系的table里的relationship设定时增加cascade:cascade="all, delete, delete-orphan”
建立多对多关系的表结构:
例子中有两个tables,分别是博客表和关键字表,一篇博客可以拥有多个关键字,一个关键字也可以对应多篇博客。
所以我们要建立一个未映射的关系表:
post_keywords = Table('post_keywords', Base.metadata,
... Column('post_id', ForeignKey('posts.id'), primary_key=True),
... Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
... )
这个表建立的方式与建立一个拥有映射关系的class不同。接下来创建博客表和关键字表分别与这个中间关系表用relationship()关联起来:
>>> class BlogPost(Base):
... __tablename__ = 'posts'
...
... id = Column(Integer, primary_key=True)
... user_id = Column(Integer, ForeignKey('users.id'))
... headline = Column(String(255), nullable=False)
... body = Column(Text)
...
... # many to many BlogPost<->Keyword
... keywords = relationship('Keyword',
... secondary=post_keywords,
... back_populates='posts')
...
... def __init__(self, headline, body, author):
... self.author = author
... self.headline = headline
... self.body = body
...
... def __repr__(self):
... return "BlogPost(%r, %r, %r)" % (self.headline, self.body, self.author)
>>> class Keyword(Base):
... __tablename__ = 'keywords'
...
... id = Column(Integer, primary_key=True)
... keyword = Column(String(50), nullable=False, unique=True)
... posts = relationship('BlogPost',
... secondary=post_keywords,
... back_populates='keywords')
...
... def __init__(self, keyword):
... self.keyword = keyword
上一篇: 整合SSM
下一篇: maven的安装、配置环境变量以及使用