python3-开发进阶Flask的基础(5)
程序员文章站
2022-07-05 14:43:48
内容概要: 一、SQLAlchemy 1、概述 SQLAlchemy是一个ORM的框架,ORM就是关系对象映射,具体可以参照Django中的ORM。 作用:帮助我们使用类和对象快速实现数据库操作 数据库: -原生:MYSQLdb pymysql 区别就是 MYSQLdb 不支持python3 pym ......
内容概要:
- sqlalchemy
- flsak-sqlalchemy
- flask-script
- flask-migrate
- flask的目录结构
一、sqlalchemy
1、概述
sqlalchemy是一个orm的框架,orm就是关系对象映射,具体可以参照django中的orm。
作用:帮助我们使用类和对象快速实现数据库操作
数据库:
-原生:mysqldb pymysql
区别就是 mysqldb 不支持python3 pymysql 都支持
orm框架
sqlalchemy
2、sqlalchemy用法
1、安装
pip3 install sqlalchemy
2、配置
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import column from sqlalchemy import integer,string,text,date,datetime from sqlalchemy import create_engine base = declarative_base() class users(base): __tablename__ = 'users' id = column(integer, primary_key=true) name = column(string(32), index=true, nullable=false) def create_all(): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) base.metadata.create_all(engine) def drop_all(): engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) base.metadata.drop_all(engine) if __name__ == '__main__': create_all()
3、增删改查的演示
from duoduo.test import users from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) sessionfactory = sessionmaker(bind=engine) #根据users类对user表进行增删改查 session=sessionfactory() #增加 #单个 # obj=users(name='many qian') # session.add(obj) # session.commit() #多个 # session.add_all([ # users(name='大娃'), # users(name='二娃') # ]) # session.commit() #查 所有 # tes=session.query(users).all() # print(tes) #拿到的对象 # for row in tes: # print(row.id,row.name) # tes=session.query(users).filter(users.id==3) #> < >= <= # # for row in tes: # print(row.id,row.name) # # tes=session.query(users).filter(users.id<=3).first()#> < >= <= # # print(tes.id,tes.name) #删除 # tes=session.query(users).filter(users.id==3).delete() #> < >= <= # session.commit() #改 # session.query(users).filter(users.id ==4).update({users.name:'三娃'}) # session.query(users).filter(users.id ==4).update({'name':'四娃'}) # session.query(users).filter(users.id ==4).update({'name':users.name+'nice'},synchronize_session=false) # session.commit() # # # session.close()
4、单表常用操作
# ############################## 其他常用 ############################### # 1. 指定列 # select id,name as cname from users; # result = session.query(users.id,users.name.label('cname')).all() # for item in result: # print(item[0],item.id,item.cname) # 2. 默认条件and # session.query(users).filter(users.id > 1, users.name == 'duoduo').all() # 3. between # session.query(users).filter(users.id.between(1, 3), users.name == 'duoduo').all() # 4. in # session.query(users).filter(users.id.in_([1,3,4])).all() # session.query(users).filter(~users.id.in_([1,3,4])).all() # 5. 子查询 # session.query(users).filter(users.id.in_(session.query(users.id).filter(users.name=='duoduo'))).all() # 6. and 和 or # from sqlalchemy import and_, or_ # session.query(users).filter(users.id > 3, users.name == 'duoduo').all() # session.query(users).filter(and_(users.id > 3, users.name == 'duoduo')).all() # session.query(users).filter(or_(users.id < 2, users.name == 'duoduo')).all() # session.query(users).filter( # or_( # users.id < 2, # and_(users.name == 'duoduo', users.id > 3), # users.extra != "" # )).all() # 7. filter_by # session.query(users).filter_by(name='duoduo').all() # 8. 通配符 # ret = session.query(users).filter(users.name.like('d%')).all() #%任何的东西 # ret = session.query(users).filter(~users.name.like('d_')).all() #_ 只有一个字符 # 9. 切片 # result = session.query(users)[1:2] # 10.排序 # ret = session.query(users).order_by(users.name.desc()).all() # ret = session.query(users).order_by(users.name.desc(), users.id.asc()).all() #desc从大到小 asc从小到大排 # 11. group by from sqlalchemy.sql import func # ret = session.query( # users.depart_id, # func.count(users.id), # ).group_by(users.depart_id).all() # for item in ret: # print(item) # # from sqlalchemy.sql import func #进行二次筛选,只能用having # ret = session.query( # users.depart_id, # func.count(users.id), # ).group_by(users.depart_id).having(func.count(users.id) >= 2).all() # for item in ret: # print(item) # 12.union 去重 和 union all 上下拼接不去重 """ select id,name from users union select id,name from users; """ # q1 = session.query(users.name).filter(users.id > 2) # q2 = session.query(users.name).filter(users.id > 2) # ret = q1.union(q2).all() # # q1 = session.query(users.name).filter(users.id > 2) # q2 = session.query(users.name).filter(users.id > 2) # ret = q1.union_all(q2).all() session.close()
5、连表常用操作
在上面的配置里,重新搞两给表
from sqlalchemy import foreignkey from sqlalchemy.orm import relationship #按照上面配置的代码加上这些东西,配置的表删除 class depart(base): __tablename__ = 'depart' id = column(integer, primary_key=true) title = column(string(32), index=true, nullable=false) class users(base): __tablename__ = 'users' id = column(integer, primary_key=true) name = column(string(32), index=true, nullable=false) depart_id = column(integer,foreignkey("depart.id")) #跟表结构无关 dp = relationship("depart", backref='pers')
下面是实例演示:之前自己添加一点数据:
#django的manytomany 在flask中两个foreignkey完成 from duoduo.test import users,depart,student,course,student2course from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) sessionfactory = sessionmaker(bind=engine) #根据users类对user表进行增删改查 session=sessionfactory() # #1、查询所有用户 # # ret =session.query(users).all() # # # # for i in ret: # # print(i.id,i.name,i.depart_id) #2、查询所有用户,所属部门名称 # ret=session.query(users.id,users.name,depart.title).join(depart,user.depart_id==depart.id).all() # #这里有默认foreignkey ,user.depart_id==depart.id 可以不加也行 # for i in ret: # print(i.id ,i.name,i.title) #select users.id as users_id, users.name as users_name, depart.title as depart_title #from users inner join depart on depart.id = users.depart_id # q=session.query(users.id,users.name,depart.title).join(depart) # print(q) #3、relation字段:查询所有用户+所有部门名称 # ret=session.query(users).all() # for row in ret: # print(row.id,row.name,row.depart_id,row.dp.title) #4、relation字段:查询销售部所有人员 # obj=session.query(depart).filter(depart.title =='大娃').first() # # for i in obj.pers: # # print(i.id,i.name,obj.title) #5、创建一个名称叫:it部门,再在该部门中添加一个员工叫:多多 #方式一: # d1=depart(title='it') # session.add(d1) # session.commit() # # u1=users(name='duoduo',depart_id=d1.id) # session.add(u1) # session.commit() #方式二: # u1=users(name='多多1',dp=depart(title='it')) # session.add(u1) # session.commit() #6、创建一个部门叫王者荣耀,这个部门添加多个员工:亚瑟,后裔,貂蝉 # d1=depart(title='王者荣耀') # d1.pers=[users(name='亚瑟'),users(name='后裔'),users(name='貂蝉')] # # session.add(d1) # session.commit() #1、录入数据 # session.add_all([ # student(name='大娃'), # student(name='二娃'), # course(title='物理'), # course(title='化学'), # # ]) # session.add_all([ # student2course(student_id=2,course_id=1), # # student2course(student_id=1,course_id=2) # ] # ) #2、查每个人选了课程的名称,三张表进行关联 # ret=session.query(student2course.id,student.name,course.title).join(student,student2course.student_id==student.id,isouter=true).join(course,student2course.course_id==course.id,isouter=true).order_by(student2course.id.asc()) # for i in ret: # print(i) #3、'大娃'所选的所有课 # ret=session.query(student2course.id,student.name,course.title).join(student,student2course.student_id==student.id,isouter=true).join(course,student2course.course_id==course.id,isouter=true).filter(student.name=='大娃').order_by(student2course.id.asc()) # # for i in ret: # print(i) # obj=session.query(student).filter(student.name=='大娃').first() # for item in obj.course_list: # print(item.title) #4选了'化学'课程的所有人的名字 # obj=session.query(course).filter(course.title=='化学').first() # for item in obj.student_list: # print(item.name) #创建一个课程,创建2个学,两个学生选新创建的课程 # obj=course(title='体育') # obj.student_list=[student(name='五娃'),student(name='六娃')] # # session.add(obj) # session.commit() # session.close()
class student(base): __tablename__ = 'student' id = column(integer, primary_key=true) name = column(string(32), index=true, nullable=false) course_list = relationship('course', secondary='student2course', backref='student_list') class course(base): __tablename__ = 'course' id = column(integer, primary_key=true) title = column(string(32), index=true, nullable=false) class student2course(base): __tablename__ = 'student2course' id = column(integer, primary_key=true, autoincrement=true) student_id = column(integer, foreignkey('student.id')) course_id = column(integer, foreignkey('course.id')) __table_args__ = ( uniqueconstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引 # index('ix_id_name', 'name', 'extra'), # 联合索引 )
连接的方式:
from duoduo.test import users,depart,student,course,student2course from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) sessionfactory = sessionmaker(bind=engine) #连接 #第一种方式 # #并发 # def task(): # #去连接池获取一个连接 # session=sessionfactory() # ret=session.query(student).all() # print(ret) # #将连接交还给连接池 # session.close() # # # from threading import thread # # for i in range(20): # t=thread(target=task) # t.start() # #第二种方式 from sqlalchemy.orm import scoped_session session=scoped_session(sessionfactory) def task(): ret=session.query(student).all() print(ret) session.remove() #连接断开 from threading import thread for i in range(20): t=thread(target=task) t.start()
执行原生sq:
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session from models import student,course,student2course engine = create_engine( "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) sessionfactory = sessionmaker(bind=engine) session = scoped_session(sessionfactory) def task(): """""" # 方式一: """ # 查询 # cursor = session.execute('select * from users') # result = cursor.fetchall() # 添加 cursor = session.execute('insert into users(name) values(:value)', params={"value": 'duoduo'}) session.commit() print(cursor.lastrowid) """ # 方式二: """ # conn = engine.raw_connection() # cursor = conn.cursor() # cursor.execute( # "select * from t1" # ) # result = cursor.fetchall() # cursor.close() # conn.close() """ # 将连接交还给连接池 session.remove() from threading import thread for i in range(20): t = thread(target=task) t.start()
二、flask的第三方组件
1、flask-sqlalchemy
a、先下载安装
pip3 install flask-sqlalchemy
b、项目下的__init__.py导入
#第一步 :导入并实例化sqlalchemy from flask_sqlalchemy import sqlalchemy db=sqlalchemy() #一定要在蓝图导入的上面,是全局变量
#也要导入表的models.py的所有表
c、初始化
db.init_app(app) #在注册蓝图的地方的下面
d、在配置文件中写入配置
# ##### sqlalchemy配置文件 ##### sqlalchemy_database_uri = "mysql+pymysql://root:123@127.0.0.1:3306/duoduo?charset=utf8" sqlalchemy_pool_size = 10 sqlalchemy_max_overflow = 5
e、创建models.py中的类(对应数据库中的表)
from sqlalchemy import column from sqlalchemy import integer,string,text,date,datetime from (项目目录) import db class users(db.model): #继承的db.model __tablename__ = 'users' id = column(integer, primary_key=true) name = column(string(32), index=true, nullable=false) # depart_id = column(integer)
f、生成表(使用app上下文)
from (项目目录) import db,create_app app = create_app() app_ctx = app.app_context() # app_ctx = app/g with app_ctx: # __enter__,通过localstack放入local中 db.create_all() # 调用localstack放入local中获取app,再去app中获取配置
补充一个知识点:
class foo(object): def __enter__(self): print('进入') def __exit__(self, exc_type, exc_val, exc_tb): print('出来') obj =foo() with obj: #with foo() print('多多') #结果 #进入 #多多 #出来
g、基于orm对数据库进行操作
from flask import blueprint from (项目目录) import db from (项目目录) import models us = blueprint('us',__name__) @us.route('/index') def index(): # 使用sqlalchemy在数据库中插入一条数据 # db.session.add(models.users(name='多多',depart_id=1)) #插入数据 # db.session.commit() # db.session.remove() result = db.session.query(models.users).all() print(result) db.session.remove() return 'index'
这里面db.session.add的源码:
2、 flask-script
下载安装
pip3 install flask-script
功能:
a、增加runsever
from (项目目录) import create_app from flask_script import manager app = create_app() manager = manager(app) if __name__ == '__main__': # app.run() manager.run()
b、位置传参
from (项目目录) import create_app from flask_script import manager app = create_app() manager = manager(app) @manager.command def custom(arg): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg) if __name__ == '__main__': # app.run() manager.run()
c、关键字传参
from flask_script import manager from (项目目录) import create_app app = create_app() manager = manager(app) @manager.option('-n', '--name', dest='name') @manager.option('-u', '--url', dest='url') def cmd(name, url): """ 自定义命令 执行: python manage.py cmd -n qianduoduo -u http://www.baidu.com :param name: :param url: :return: """ print(name, url) if __name__ == '__main__': # app.run() manager.run()
3、flask-migrate
安装
pip3 install flask-migrate
配置是依赖 flask-script
from flask_script import manager from flask_migrate import migrate, migratecommand from (项目目录) import create_app from (项目目录) import db app = create_app() manager = manager(app) migrate(app, db) """ # 数据库迁移命名 python manage.py db init #只执行一次 python manage.py db migrate # makemirations python manage.py db upgrade # migrate """ manager.add_command('db', migratecommand) if __name__ == '__main__': # app.run() manager.run()
补充工具:(协同开发的保证开发的环境一致性)
1、pipreqs
找到项目使用的所有组件的版本
pip3 install pipreqs
在项目中的命令行
pipreqs ./ --encoding=utf-8
在项目中找了文件requirements.txt
如何装那些模块?
pip install 的时候可以指定一个文件,他会自己读每一行,然后安装
pycharm 打开文件的时候也会提示下载
2、虚拟环境 (开发环境版本不能共存的)
安装
pip3 install virtualenv
创建虚拟环境
#找一个存放虚拟环境的文件夹 virtualenv env1 --no-site-packages 创建环境 activate #激活环境 deactivate # 退出环境 #也可以用pycharm鼠标点点就好了,最新的版本就有虚拟环境goon功能 #当我们需要特定的环境,可以在电脑上有多高开发环境