欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python3-开发进阶Flask的基础(5)

程序员文章站 2022-04-09 18:15:11
内容概要: 一、SQLAlchemy 1、概述 SQLAlchemy是一个ORM的框架,ORM就是关系对象映射,具体可以参照Django中的ORM。 作用:帮助我们使用类和对象快速实现数据库操作 数据库: -原生:MYSQLdb pymysql 区别就是 MYSQLdb 不支持python3 pym ......

内容概要:

  1. sqlalchemy
  2. flsak-sqlalchemy
  3. flask-script
  4. flask-migrate
  5. flask的目录结构

 

一、sqlalchemy

1、概述

sqlalchemy是一个orm的框架,orm就是关系对象映射,具体可以参照django中的orm。

作用:帮助我们使用类和对象快速实现数据库操作

 

数据库:

  -原生:mysqldb       pymysql

  区别就是 mysqldb 不支持python3   pymysql 都支持

orm框架

  sqlalchemy

2、sqlalchemy用法  

1、安装

pip3 install sqlalchemy

2、配置

python3-开发进阶Flask的基础(5)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import column
from sqlalchemy import integer,string,text,date,datetime
from sqlalchemy import create_engine


base = declarative_base()

class users(base):
    __tablename__ = 'users'

    id = column(integer, primary_key=true)
    name = column(string(32), index=true, nullable=false)


def create_all():
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    base.metadata.create_all(engine)

def drop_all():
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    base.metadata.drop_all(engine)

if __name__ == '__main__':
    create_all()
配置

3、增删改查的演示

python3-开发进阶Flask的基础(5)
from duoduo.test import users
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine


engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )



sessionfactory = sessionmaker(bind=engine)

#根据users类对user表进行增删改查
session=sessionfactory()
#增加
#单个
# obj=users(name='many qian')
# session.add(obj)
# session.commit()
#多个
# session.add_all([
#         users(name='大娃'),
#         users(name='二娃')
# ])
# session.commit()

#查  所有
# tes=session.query(users).all()
# print(tes) #拿到的对象
# for row in tes:
#         print(row.id,row.name)

# tes=session.query(users).filter(users.id==3) #> < >= <=
#
# for row in tes:
#     print(row.id,row.name)

#
# tes=session.query(users).filter(users.id<=3).first()#> < >= <=
#
# print(tes.id,tes.name)

#删除
# tes=session.query(users).filter(users.id==3).delete() #> < >= <=
# session.commit()

#改
# session.query(users).filter(users.id ==4).update({users.name:'三娃'})
# session.query(users).filter(users.id ==4).update({'name':'四娃'})
# session.query(users).filter(users.id ==4).update({'name':users.name+'nice'},synchronize_session=false)
# session.commit()
#
#
# session.close()
增删改查

 4、单表常用操作

python3-开发进阶Flask的基础(5)
# ############################## 其他常用 ###############################
# 1. 指定列
# select id,name as cname from users;
# result = session.query(users.id,users.name.label('cname')).all()
# for item in result:
#         print(item[0],item.id,item.cname)
# 2. 默认条件and
# session.query(users).filter(users.id > 1, users.name == 'duoduo').all()
# 3. between
# session.query(users).filter(users.id.between(1, 3), users.name == 'duoduo').all()
# 4. in
# session.query(users).filter(users.id.in_([1,3,4])).all()
# session.query(users).filter(~users.id.in_([1,3,4])).all()
# 5. 子查询
# session.query(users).filter(users.id.in_(session.query(users.id).filter(users.name=='duoduo'))).all()
# 6. and 和 or
# from sqlalchemy import and_, or_
# session.query(users).filter(users.id > 3, users.name == 'duoduo').all()
# session.query(users).filter(and_(users.id > 3, users.name == 'duoduo')).all()
# session.query(users).filter(or_(users.id < 2, users.name == 'duoduo')).all()
# session.query(users).filter(
#     or_(
#         users.id < 2,
#         and_(users.name == 'duoduo', users.id > 3),
#         users.extra != ""
#     )).all()

# 7. filter_by
# session.query(users).filter_by(name='duoduo').all()

# 8. 通配符
# ret = session.query(users).filter(users.name.like('d%')).all()   #%任何的东西
# ret = session.query(users).filter(~users.name.like('d_')).all()   #_ 只有一个字符

# 9. 切片
# result = session.query(users)[1:2]

# 10.排序
# ret = session.query(users).order_by(users.name.desc()).all()
# ret = session.query(users).order_by(users.name.desc(), users.id.asc()).all()  #desc从大到小  asc从小到大排

# 11. group by
from sqlalchemy.sql import func

# ret = session.query(
#         users.depart_id,
#         func.count(users.id),
# ).group_by(users.depart_id).all()
# for item in ret:
#         print(item)
#
# from sqlalchemy.sql import func
#进行二次筛选,只能用having
# ret = session.query(
#         users.depart_id,
#         func.count(users.id),
# ).group_by(users.depart_id).having(func.count(users.id) >= 2).all()
# for item in ret:
#         print(item)

# 12.union 去重 和 union all 上下拼接不去重
"""
select id,name from users
union
select id,name from users;
"""
# q1 = session.query(users.name).filter(users.id > 2)
# q2 = session.query(users.name).filter(users.id > 2)
# ret = q1.union(q2).all()
#
# q1 = session.query(users.name).filter(users.id > 2)
# q2 = session.query(users.name).filter(users.id > 2)
# ret = q1.union_all(q2).all()


session.close()
单表常用操作

5、连表常用操作

在上面的配置里,重新搞两给表

from sqlalchemy import foreignkey
from sqlalchemy.orm import relationship
#按照上面配置的代码加上这些东西,配置的表删除 
class depart(base):
    __tablename__ = 'depart'
    id = column(integer, primary_key=true)
    title = column(string(32), index=true, nullable=false)

class users(base):
    __tablename__ = 'users'

    id = column(integer, primary_key=true)
    name = column(string(32), index=true, nullable=false)
    depart_id = column(integer,foreignkey("depart.id"))
    #跟表结构无关
    dp = relationship("depart", backref='pers')

下面是实例演示:之前自己添加一点数据:

python3-开发进阶Flask的基础(5)
#django的manytomany  在flask中两个foreignkey完成
from duoduo.test import users,depart,student,course,student2course
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine


engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )



sessionfactory = sessionmaker(bind=engine)

#根据users类对user表进行增删改查
session=sessionfactory()

# #1、查询所有用户
# # ret =session.query(users).all()
# #
# # for i in ret:
# #     print(i.id,i.name,i.depart_id)

#2、查询所有用户,所属部门名称
# ret=session.query(users.id,users.name,depart.title).join(depart,user.depart_id==depart.id).all()
#  #这里有默认foreignkey ,user.depart_id==depart.id 可以不加也行
# for i in ret:
#     print(i.id ,i.name,i.title)

#select users.id as users_id, users.name as users_name, depart.title as depart_title
#from users inner join depart on depart.id = users.depart_id
# q=session.query(users.id,users.name,depart.title).join(depart)
# print(q)

#3、relation字段:查询所有用户+所有部门名称
# ret=session.query(users).all()
# for row in ret:
#     print(row.id,row.name,row.depart_id,row.dp.title)

#4、relation字段:查询销售部所有人员
# obj=session.query(depart).filter(depart.title =='大娃').first()
# # for i in obj.pers:
# #     print(i.id,i.name,obj.title)

#5、创建一个名称叫:it部门,再在该部门中添加一个员工叫:多多
#方式一:
# d1=depart(title='it')
# session.add(d1)
# session.commit()
#
# u1=users(name='duoduo',depart_id=d1.id)
# session.add(u1)
# session.commit()
#方式二:
# u1=users(name='多多1',dp=depart(title='it'))
# session.add(u1)
# session.commit()


#6、创建一个部门叫王者荣耀,这个部门添加多个员工:亚瑟,后裔,貂蝉

# d1=depart(title='王者荣耀')
# d1.pers=[users(name='亚瑟'),users(name='后裔'),users(name='貂蝉')]
#
# session.add(d1)
# session.commit()


#1、录入数据
# session.add_all([
#     student(name='大娃'),
#     student(name='二娃'),
#     course(title='物理'),
#     course(title='化学'),
#
# ])
# session.add_all([
#     student2course(student_id=2,course_id=1),
#     # student2course(student_id=1,course_id=2)
# ]
# )

#2、查每个人选了课程的名称,三张表进行关联
# ret=session.query(student2course.id,student.name,course.title).join(student,student2course.student_id==student.id,isouter=true).join(course,student2course.course_id==course.id,isouter=true).order_by(student2course.id.asc())
# for i in ret:
#     print(i)

#3、'大娃'所选的所有课

# ret=session.query(student2course.id,student.name,course.title).join(student,student2course.student_id==student.id,isouter=true).join(course,student2course.course_id==course.id,isouter=true).filter(student.name=='大娃').order_by(student2course.id.asc())
#
# for i in ret:
#     print(i)

# obj=session.query(student).filter(student.name=='大娃').first()
# for item in obj.course_list:
#     print(item.title)
#4选了'化学'课程的所有人的名字
# obj=session.query(course).filter(course.title=='化学').first()
# for item in obj.student_list:
#     print(item.name)


#创建一个课程,创建2个学,两个学生选新创建的课程
# obj=course(title='体育')
# obj.student_list=[student(name='五娃'),student(name='六娃')]
#
# session.add(obj)
# session.commit()








# session.close()
foreignkey
python3-开发进阶Flask的基础(5)
class student(base):
    __tablename__ = 'student'
    id = column(integer, primary_key=true)
    name = column(string(32), index=true, nullable=false)

    course_list = relationship('course', secondary='student2course', backref='student_list')

class course(base):
    __tablename__ = 'course'
    id = column(integer, primary_key=true)
    title = column(string(32), index=true, nullable=false)

class student2course(base):
    __tablename__ = 'student2course'
    id = column(integer, primary_key=true, autoincrement=true)
    student_id = column(integer, foreignkey('student.id'))
    course_id = column(integer, foreignkey('course.id'))

    __table_args__ = (
        uniqueconstraint('student_id', 'course_id', name='uix_stu_cou'), # 联合唯一索引
        # index('ix_id_name', 'name', 'extra'),        # 联合索引
    )
后面新建的三个表

 连接的方式:

from duoduo.test import users,depart,student,course,student2course
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine


engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )



sessionfactory = sessionmaker(bind=engine)


#连接
#第一种方式
# #并发
# def task():
#     #去连接池获取一个连接
#     session=sessionfactory()
#     ret=session.query(student).all()
#     print(ret)
#     #将连接交还给连接池
#     session.close()
#
#
# from threading import thread
#
# for i in range(20):
#     t=thread(target=task)
#     t.start()
#

#第二种方式

from sqlalchemy.orm import scoped_session
session=scoped_session(sessionfactory)


def task():
    ret=session.query(student).all()
    print(ret)
    session.remove()  #连接断开


from threading import thread
for i in range(20):
    t=thread(target=task)
    t.start()

执行原生sq:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import student,course,student2course

engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/duoduo123?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
sessionfactory = sessionmaker(bind=engine)
session = scoped_session(sessionfactory)


def task():
    """"""
    # 方式一:
    """
    # 查询
    # cursor = session.execute('select * from users')
    # result = cursor.fetchall()

    # 添加
    cursor = session.execute('insert into users(name) values(:value)', params={"value": 'duoduo'})
    session.commit()
    print(cursor.lastrowid)
    """
    # 方式二:
    """
    # conn = engine.raw_connection()
    # cursor = conn.cursor()
    # cursor.execute(
    #     "select * from t1"
    # )
    # result = cursor.fetchall()
    # cursor.close()
    # conn.close()
    """

    # 将连接交还给连接池
    session.remove()


from threading import thread

for i in range(20):
    t = thread(target=task)
    t.start()

 

二、flask的第三方组件

1、flask-sqlalchemy

a、先下载安装

pip3 install flask-sqlalchemy

b、项目下的__init__.py导入

#第一步  :导入并实例化sqlalchemy
from flask_sqlalchemy import sqlalchemy
db=sqlalchemy()
#一定要在蓝图导入的上面,是全局变量
#也要导入表的models.py的所有表

c、初始化

db.init_app(app)  
#在注册蓝图的地方的下面

d、在配置文件中写入配置

# ##### sqlalchemy配置文件 #####
    sqlalchemy_database_uri = "mysql+pymysql://root:123@127.0.0.1:3306/duoduo?charset=utf8"
    sqlalchemy_pool_size = 10
    sqlalchemy_max_overflow = 5

e、创建models.py中的类(对应数据库中的表)

from sqlalchemy import column
from sqlalchemy import integer,string,text,date,datetime
from (项目目录) import db

class users(db.model):  #继承的db.model
    __tablename__ = 'users'

    id = column(integer, primary_key=true)
    name = column(string(32), index=true, nullable=false)
    # depart_id = column(integer)

f、生成表(使用app上下文)

from (项目目录) import db,create_app
app = create_app()
app_ctx = app.app_context() # app_ctx = app/g
with app_ctx: # __enter__,通过localstack放入local中
    db.create_all() # 调用localstack放入local中获取app,再去app中获取配置

补充一个知识点:

class  foo(object):
    def __enter__(self):
        print('进入')

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('出来')
obj =foo()
with obj:  #with foo()
    print('多多')

#结果 
#进入
#多多
#出来

g、基于orm对数据库进行操作

from flask import blueprint
from (项目目录) import db
from (项目目录) import models
us = blueprint('us',__name__)

@us.route('/index')
def index():
    # 使用sqlalchemy在数据库中插入一条数据
    # db.session.add(models.users(name='多多',depart_id=1))     #插入数据
    # db.session.commit()
    # db.session.remove()
    result = db.session.query(models.users).all()
    print(result)
    db.session.remove()
    return 'index'

这里面db.session.add的源码:

python3-开发进阶Flask的基础(5)

 

python3-开发进阶Flask的基础(5)

 

2、 flask-script

下载安装

pip3 install flask-script

功能:

  a、增加runsever

from (项目目录) import create_app
from flask_script import manager

app = create_app()
manager = manager(app)

if __name__ == '__main__':
    # app.run()
    manager.run()

  b、位置传参

from (项目目录) import create_app
from flask_script import manager

app = create_app()
manager = manager(app)

@manager.command
def custom(arg):
    """
    自定义命令
    python manage.py custom 123
    :param arg:
    :return:
    """
    print(arg)

if __name__ == '__main__':
    # app.run()
    manager.run()

  c、关键字传参  

from flask_script import manager
from (项目目录) import create_app

app = create_app()
manager = manager(app)

@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定义命令
    执行: python manage.py  cmd -n qianduoduo -u http://www.baidu.com
    :param name:
    :param url:
    :return:
    """
    print(name, url)

if __name__ == '__main__':
    # app.run()
    manager.run()

 

3、flask-migrate

安装

pip3 install flask-migrate

配置是依赖 flask-script

from flask_script import manager
from flask_migrate import migrate, migratecommand
from (项目目录) import create_app
from (项目目录) import db

app = create_app()
manager = manager(app)
migrate(app, db)
"""
# 数据库迁移命名
    python manage.py db init   #只执行一次
    python manage.py db migrate # makemirations
    python manage.py db upgrade # migrate
"""
manager.add_command('db', migratecommand)

if __name__ == '__main__':
    # app.run()
    manager.run()

补充工具:(协同开发的保证开发的环境一致性)

1、pipreqs

找到项目使用的所有组件的版本

pip3 install  pipreqs

在项目中的命令行   

pipreqs ./ --encoding=utf-8

在项目中找了文件requirements.txt

如何装那些模块?  

pip  install 的时候可以指定一个文件,他会自己读每一行,然后安装

pycharm 打开文件的时候也会提示下载

 

2、虚拟环境 (开发环境版本不能共存的)

安装

pip3 install virtualenv

创建虚拟环境

#找一个存放虚拟环境的文件夹
virtualenv env1 --no-site-packages  创建环境

activate  #激活环境

deactivate # 退出环境


#也可以用pycharm鼠标点点就好了,最新的版本就有虚拟环境goon功能
#当我们需要特定的环境,可以在电脑上有多高开发环境