欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python ORM框架SQLAlchemy

程序员文章站 2024-03-03 18:20:04
...

SQLAlchemy是一个基于Python的ORM框架。该框架是建立在DB-API之上,使用关系对象映射进行数据库操作。

简而言之就是,将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

补充:什么是DB-API ? 是Python的数据库接口规范。

在没有DB-API之前,各数据库之间的应用接口非常混乱,实现各不相同,

项目需要更换数据库的时候,需要做大量的修改,非常不方便,DB-API就是为了解决这样的问题。

1
pip install sqlalchemy

组成部分:

-- engine,框架的引擎

-- connection pooling 数据库连接池

-- Dialect 选择链接数据库的DB-API种类(实际选择哪个模块链接数据库)

-- Schema/Types 架构和类型

-- SQL Expression Language SQL表达式语言
连接数据库
SQLAlchemy 本身无法操作数据库,其必须依赖遵循DB-API规范的三方模块,

Dialect 用于和数据API进行交互,根据配置的不同调用不同数据库API,从而实现数据库的操作。

MySQL-PYthon

mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

pymysql

mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

MySQL-Connector

mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

cx_Oracle

oracle+cx_oracle://user:[email protected]:port/dbname[?key=value&key=value...]

更多

http://docs.sqlalchemy.org/en/latest/dialects/index.html

不同的数据库API
不同的数据库API

from sqlalchemy import create_engine

engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接数
pool_size=5, # 连接池大小
pool_timeout=30, # 连接池中没有线程最多等待时间,否则报错
pool_recycle=-1, # 多久之后对连接池中的连接进行回收(重置)-1不回收
)
连接数据库
执行原生SQL
from sqlalchemy import create_engine

engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接数
pool_size=5, # 连接池大小
pool_timeout=30, # 连接池中没有线程最多等待时间,否则报错
pool_recycle=-1, # 多久之后对连接池中的连接进行回收(重置)-1不回收
)

def test():
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute("select * from Course")
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()

if name == 'main':
test()

((1, '生物', 1), (2, '体育', 2), (3, '物理', 1))

raw_connection

raw_connection
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",
max_overflow=0,
pool_size=5,
)

def test():
conn = engine.contextual_connect()
with conn:
cur = conn.execute(
"select * from Course"
)
result = cur.fetchall()
print(result)

if name == 'main':
test()

[(1, '生物', 1), (2, '体育', 2), (3, '物理', 1)]

contextual_connect

contextual_connect
from sqlalchemy import create_engine
engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",
max_overflow=0,
pool_size=5,
)

def test():
cur = engine.execute("select * from Course")
result = cur.fetchall()
print(result)
cur.close()

if name == 'main':
test()

[(1, '生物', 1), (2, '体育', 2), (3, '物理', 1)]

engine.execute

engine.execute
ORM操作
一、创建表

单表的创建 app.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Index, UniqueConstraint
import datetime

sqlalchemy要依赖pysysql,用户名,密码,ip,端口号,数据库名字,编码方式

ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/database111?charset=utf8",)

Base = declarative_base()

class UserInfo(Base):
tablename = "user_info" # 表的名字就叫user_info

id = Column(Integer, primary_key=True, autoincrement=True)  # 整数,默认主键,自增
name = Column(String(32), index=True, nullable=False)  # 字符串
extra = Column(String(32), unique=True)  # 字符串def create_db():  # 创建表
Base.metadata.create_all(ENGINE)  # 就是将继承的Base的类的所有的表都创建,创建到ENGINE数据库,就上上面那个mysql+pymysql数据库

def drop_db(): # 删除表
Base.metadata.drop_all(ENGINE)

if name == 'main':
create_db()
单表的增加数据

ad.py

import app
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

创建连接

ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/db111?charset=utf8",)
Session = sessionmaker(bind=ENGINE)

每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session

session = Session()

单条数据增加

obj1 = app.UserInfo(name="xiaoming", extra="bangbangbang") # 先实例化一个对象,这样就拿到一个对象
obj2 = app.UserInfo(name="xiaojun", extra="bangbang") # 先实例化一个对象,这样就拿到一个对象

session.add(obj1) # 把这两个对象传给session了
session.add(obj2) # 把这俩对象放内存了
session.commit() # 就把这个数据提交到数据库了

session.close() # 关闭连接
基于SQLAlchemy操作原生SQL

from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:[email protected]:3306/db111?charset=utf8")
cur = engine.execute('select * from user_info') # 执行原生sql
result = cur.fetchall() # 拿到所有的数据

print(result)

打印结果 [(5, 'xiaoming', 'bangbangbang'), (6, 'xiaojun', 'bangbang')]

一对多和多对多表的创建

一对多和对对多表的创建

app1.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy import Index, UniqueConstraint
import datetime

sqlalchemy要依赖pysysql,用户名,密码,ip,端口号,数据库名字,编码方式

ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/db111?charset=utf8",)

Base = declarative_base()

创建班级表

class Classes(Base):
tablename = "classes"
id = Column(Integer, primary_key=True, autoincrement=True) # 整数,默认主键,自增
name = Column(String(32), nullable=False, unique=True)

创建学生表

class Student(Base):
tablename = "student"
id = Column(Integer, primary_key=True, autoincrement=True) # 整数,默认主键,自增
username = Column(String(32), nullable=False, unique=True) # 字符串,不能为空,唯一
password = Column(String(64), nullable=False) # 字符串,不能为空,可以不唯一
ctime = Column(DateTime, default=datetime.datetime.now) # 注意这里的new不加括号,如果加了括号,时间一直都是这个程序的启动时间
class_id = Column(Integer, ForeignKey("classes.id")) # 外键关联,要关联它的别名,关联id

创建爱好表

class Hobby(Base):
tablename = 'hobby'
id = Column(Integer, primary_key=True) # 整数,默认主键
caption = Column(String(50), default='篮球') # 字符串,默认值是篮球

创建学生表和爱好表的多对多关系表,第三张表

class Student2Hobby(Base): # 要创建多对多关系,需要自己创建第三张表
tablename = 'student2hobby'
id = Column(Integer, primary_key=True, autoincrement=True)
student_id = Column(Integer, ForeignKey('student.id')) # 外键关联关系
hobby_id = Column(Integer, ForeignKey('hobby.id')) # 外键关联关系

__table_args__ = (
    UniqueConstraint('student_id', 'hobby_id', name='uix_student_id_hobby_id'),  # 创建联合唯一索引
    # Index('ix_student_id_hobby_id', 'student_id', 'hobby_id')  # 普通的联合索引,不约束唯一
)

def create_db():
Base.metadata.create_all(ENGINE) # 就是将继承的Base的类的所有的表都创建,创建到ENGINE数据库,就上上面那个mysql+pymysql数据库

if name == 'main':
create_db()
多条数据增加

ad1.py

import app1
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

创建连接

ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/db111?charset=utf8",)
Session = sessionmaker(bind=ENGINE)

每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session

session = Session()

多条数据增加

objs = [
app1.Classes(name="1班"),
app1.Classes(name="2班"),
app1.Classes(name="3班"),
app1.Classes(name="4班"),
app1.Classes(name="5班")
]
session.add_all(objs) # 把这两个对象传给session了
session.commit() # 就把这个数据提交到数据库了

session.close() # 关闭连接
查询表数据

index1.py

import app1
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

创建连接

ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/db111?charset=utf8",)
Session = sessionmaker(bind=ENGINE)

每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session

session = Session()

查询表数据

result = session.query(app1.Classes).all()
print(result) # 打印出的是一个对象列表
for item in result:
print(item.id, item.name)

session.close() # 关闭连接
删除表数据

del.py

import app1
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

创建连接

ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/db111?charset=utf8",)
Session = sessionmaker(bind=ENGINE)

每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session

session = Session() # 将操作提交到数据库

查询表数据

session.query(app1.Classes).filter(app1.Classes.id > 2).delete() # 删除id>2的班级
session.commit()

session.close() # 关闭连接
修改表数据

import app1
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

创建连接

ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/db111?charset=utf8",)
Session = sessionmaker(bind=ENGINE)

每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session

session = Session()

修改表数据# session.query(app1.Classes).filter_by(id=1).update({app1.Classes.name: "Ming"}) # 修改成Ming

session.query(app1.Classes).filter_by(id=2).update({"name": "jun"}) # 修改成jun

session.query(app1.Classes).filter_by(id=3).update({"name": app1.Classes.name + "~"}, synchronize_session=False) # 后面加上

synchronize_session="evaluate" 默认值进行数字加减

session.commit()

session.close() # 关闭连接
常用的条件查询

import app1
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

创建连接

ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/db111?charset=utf8",)
Session = sessionmaker(bind=ENGINE)

每次执行数据库操作的时候,都需要创建一个session,就会开辟内存空间放这个session

session = Session()

条件查询

ret1 = session.query(app1.Classes).filter_by(id=1).first()

print(ret1.id, ret1.name)

ret2 = session.query(app1.Classes).filter(app1.Classes.id > 4, app1.Classes.name == "7班").first()

print(ret2.id, ret2.name)

ret3 = session.query(app1.Classes).filter(app1.Classes.id.between(1, 10)).all() # 范围内

for i in ret3:

print(i.id)

ret4 = session.query(app1.Classes).filter(app1.Classes.id.in_([1, 2, 6, 10])).all() # id在这个列表里面

for k in ret4:

print(k.name)

from sqlalchemy import and_, or_

ret5 = session.query(app1.Classes).filter(and_(app1.Classes.id > 3, app1.Classes.name == "6班")).first() # 两个条件都要满足

print(ret5.name)

ret6 = session.query(app1.Classes).filter(or_(app1.Classes.id > 3, app1.Classes.name == "没有这个名字")).first() # 只需要满足一个条件

print(ret6.name)

ret7 = session.query(app1.Classes).filter(or_(

app.Classes.id > 1,

and_(app1.Classes.id > 3, app1.Classes.name == "8班")

)).all()

for g in ret7:

print(g.name)

通配符

ret8 = session.query(app1.Classes).filter(app1.Classes.name.like("%班")).all()

print(ret8)

ret9 = session.query(app1.Classes).filter(app1.Classes.name.like("班%")).all()

限制

ret10 = session.query(app1.Classes).filter(app1.Classes.name.like("%班")).all()[1:10]

for l in ret10:

print(l.name)

# 排序

ret11 = session.query(app1.Classes).order_by(app1.Classes.id.desc()).all() # 倒序

for r in ret11:

print(r.name)

ret12 = session.query(app1.Classes).order_by(app1.Classes.id.asc()).all() # 正序

for a in ret12:

print(a.name)

分组

ret13 = session.query(app1.Classes.name).group_by(app1.Classes.name).all()

print(ret13)

for x in ret13:

print(x.name)

聚合函数

from sqlalchemy.sql import func

ret14 = session.query(

func.max(app1.Classes.id),

func.sum(app1.Classes.id),

func.min(app1.Classes.id)

).group_by(app1.Classes.name).having(func.max(app1.Classes.id > 1)).all()

print(ret14)

连表

ret15 = session.query(app1.Student, app1.Classes).filter(app1.Student.class_id == app1.Classes.id).all()

for m in ret15:

print(m[0].username)

print(ret15) 得到一个列表套元组 元组里是两个对象

ret16 = session.query(app1.Student).join(app1.Classes).all()

print(ret16[2].username) # 得到列表里面是前一个表的对象

相当于inner join

for i in ret16:

print(i[0].username, i[1].username)

ret17 = session.query(Hobby).join(UserInfo, isouter=True).all()

ret17_1 = session.query(UserInfo).join(Hobby, isouter=True).all()

ret18 = session.query(Hobby).outerjoin(UserInfo).all()

ret18_1 = session.query(UserInfo).outerjoin(Hobby).all()

相当于left join

print(ret17)

print(ret17_1)

print(ret18)

print(ret18_1)

session.commit()
session.close() # 关闭连接

基于relationship的FK外键

添加

user_obj = UserInfo(name="提莫", hobby=Hobby(title="种蘑菇"))
session.add(user_obj)

hobby = Hobby(title="弹奏一曲")
hobby.user = [UserInfo(name="琴女"), UserInfo(name="妹纸")]
session.add(hobby)
session.commit()

基于relationship的正向查询

user_obj_1 = session.query(UserInfo).first()
print(user_obj_1.name)
print(user_obj_1.hobby.title)

基于relationship的反向查询

hb = session.query(Hobby).first()
print(hb.title)
for i in hb.user:
print(i.name)

session.close()

基于relationship的FK

添加

book_obj = Book(title="Python源码剖析")
tag_obj = Tag(title="Python")
b2t = Book2Tag(book_id=book_obj.id, tag_id=tag_obj.id)
session.add_all([
book_obj,
tag_obj,
b2t,
])
session.commit()

上面有坑哦~~~~

book = Book(title="测试")
book.tags = [Tag(title="测试标签1"), Tag(title="测试标签2")]
session.add(book)
session.commit()

tag = Tag(title="LOL")
tag.books = [Book(title="大龙刷新时间"), Book(title="小龙刷新时间")]
session.add(tag)
session.commit()

基于relationship的正向查询

book_obj = session.query(Book).filter_by(id=4).first()
print(book_obj.title)
print(book_obj.tags)

基于relationship的反向查询

tag_obj = session.query(Tag).first()
print(tag_obj.title)
print(tag_obj.books)

基于relationship的M2M多对多