欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

鱼书项目模块化总结

程序员文章站 2023-09-02 16:30:22
鱼书项目模块化总结 项目总体思路 .png) 模型类 视图蓝本 表单验证数据API flask上下文/ajax .png) 1 异步发送邮件模块 邮件发送: 需要进行注册邮件发送或者功能模块需要发送邮件时可以采用: 2 枚举类标识状态 状态标识 状态的变更需要进行标识时(等待,成功,拒绝,撤销) 3 ......

鱼书项目模块化总结

项目总体思路

鱼书项目模块化总结

模型类

鱼书项目模块化总结

视图蓝本

鱼书项目模块化总结

表单验证数据API

鱼书项目模块化总结

flask上下文/ajax

鱼书项目模块化总结

1 异步发送邮件模块

邮件发送:

需要进行注册邮件发送或者功能模块需要发送邮件时可以采用:

from threading import Thread

from app import mail
from flask import current_app,render_template
from flask_mail import Message

'''开启异步线程'''
def send_async_mail(app,msg):
    '''mail.send发送需要获取上下文,因此添加with'''
    with app.app_context():
        try:
            mail.send(msg)
        except Exception as e:
            raise e

def send_mail(to, subject, template,**kwargs):
    '''
    发送邮件
    :param to: 收件人
    :param subject: 标题
    :param template: 渲染模板
    :param kwargs: 关键字参数
    :return:
    '''
    msg = Message('[鱼书]'+ '' +subject,
                  sender=current_app.config['MAIL_USERNAME'],
                  recipients=[to])
    msg.html = render_template(template,**kwargs)
    '''
    current_app是代理对象,开启新的线程时,我们直接获取真实的app核心对象_get_current_object()
    '''
    app = current_app._get_current_object()
    thr = Thread(target=send_async_mail,args=[app,msg])
    thr.start()

2 枚举类标识状态

状态标识

状态的变更需要进行标识时(等待,成功,拒绝,撤销)

from enum import Enum

class PendingStatus(Enum):
    '''
    交易状态: 枚举方法实现
    '''
    Waiting = 1
    Success = 2
    Reject = 3
    Redraw = 4

    @classmethod
    def pending_str(cls,status,key):
        key_map = {
            cls.Waiting:{
                'requester':'等待对方邮寄',
                'gifter':'等待你邮寄'
            },
            cls.Success: {
                'requester': '对方已邮寄',
                'gifter': '你已邮寄,交易完成'
            },
            cls.Reject: {
                'requester': '对方已拒绝',
                'gifter': '你已经拒绝'
            },
            cls.Redraw: {
                'requester': '你已撤销',
                'gifter': '对方已撤销'
            }
        }
        return key_map[status][key]

3 User模型的方法

模型定义的方法:

1 password以hash方式存储

2 用户信息生成token

3 password的私有化

from math import floor

from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
from itsdangerous import  TimedJSONWebSignatureSerializer as Serializer

from app.libs.enums import PendingStatus
from app.models.base import Base, db
from sqlalchemy import Column, Integer, String, Boolean, Float
from app import login_manager
from app.models.drift import Drift
from app.models.gift import Gift
from app.models.wish import Wish
from app.spider.yushu_book import YuShuBook
from app.libs.helper import is_isbn_or_key
from flask import current_app

class User(Base, UserMixin):
    '''
    模型属性设置 , UserMixin 记录用户账号的状态
    '''
    id = Column(Integer, primary_key=True)
    nickname = Column(String(24), nullable=False)
    _password = Column('password', String(128), nullable=True)
    phone_number = Column(String(18), unique=True)
    email = Column(String(50), unique=True, nullable=False)
    confirmed = Column(Boolean, default=False)
    beans = Column(Float, default=0)
    send_counter = Column(Integer, default=0)
    receive_counter = Column(Integer, default=0)
    wx_open_id = Column(String(50))
    wx_name = Column(String(32))

    '''
    将password方法hash加密只读并将其变为属性访问
    '''
    @property
    def password(self):
        return self._password

    @password.setter
    def password(self, raw):
        self._password = generate_password_hash(raw)

    def check_password(self, raw):
        return check_password_hash(self._password, raw)

    def generate_token(self, expiration=600):
        s = Serializer(current_app.config['SECRET_KEY'],expiration)
        return s.dumps({'id':self.id}).decode('utf-8')

    @staticmethod
    def reset_password(token,new_password):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token.encode('utf-8'))
        except:
            return False
        uid = data.get('id')
        with db.auto_commit():
            user = User.query.get(uid)
            if user:
                user.password = new_password
        return True

    @property
    def summary(self):
        return dict(
            nickname = self.nickname,
            beans = self.beans,
            email = self.email,
            send_receive = str(self.send_counter)+ '/' + str(self.receive_counter)
        )

@login_manager.user_loader
def get_user(uid):
    '''
    继承UserMixin,进行用户的回调
    :param uid:
    :return:
    '''
    return User.query.get(int(uid))

方案二

from app.extensions import db,login_manager
from werkzeug.security import generate_password_hash,check_password_hash
#生成token的模块
from itsdangerous import TimedJSONWebSignatureSerializer as Seralize
from flask_login import UserMixin
from flask import current_app
from .posts import Posts

class User(UserMixin,db.Model):
    __tablename__ = 'user'
    id = db.Column('id',db.Integer,primary_key=True)
    username = db.Column(db.String(12),index=True)
    password_hash = db.Column(db.String(128))
    sex = db.Column(db.Boolean,default=True)
    age = db.Column(db.Integer)
    email = db.Column(db.String(40))
    icon = db.Column(db.String(70),default='default.jpg')
    #当期账户激活状态
    confirm = db.Column(db.Boolean,default=False)
    #参数1模型名称   参数2反向引用的字段名   参数3 加载方式 提供对象
    posts = db.relationship('Posts',backref='user',lazy='dynamic')
    #secondary在多对多关系中指定关联表的名称
    favorite = db.relationship('Posts',secondary='collections',backref=db.backref('users',lazy='dynamic'),lazy='dynamic')
    #添加使用append   删除使用remove

    @property
    def password(self):
        raise ValueError

    @password.setter
    def password(self, password):
        self.password_hash = generate_password_hash(password)

    #生成token的方法
    def generate_token(self):
        s = Seralize(current_app.config['SECRET_KEY'])
        return s.dumps({'id':self.id})

    #检测token的方法
    @staticmethod
    def check_token(token):
        s = Seralize(current_app.config['SECRET_KEY'])
        #从当前的token中拿出字典
        try:
            id = s.loads(token)['id']
        except:
            return False
        #根据用户id取出对应用户的对象
        u = User.query.get(id)
        #判断 当期u对象是否存在
        if not u:
            return False
        #判断当期用户的激活状态 如果没有激活 则激活
        if not u.confirm:
            u.confirm = True
            db.session.add(u)
        return True
    #验证密码
    def check_password_hash(self,password):
        return check_password_hash(self.password_hash,password)

    def is_favorite(self,postsId):
        all = self.favorite.all()
        for p in all:
            if p.id==postsId:
                return True
            #lambda表达式
            if list(filter(lambda p:p.id==int(postsId),all)):
                return True
            return False

#登录认证的回调,保持数据的一致性
@login_manager.user_loader
def user_loader(uid):
    return User.query.get(int(uid))

4 Flask工厂函数管理三方,蓝本

1 extensions.py负责三方模块的导入与

2 app.__init__.py 负责create_app初始化

3 view.__init__.py 负责蓝本的注册

方案一:

manage.py

from app import create_app
from flask_script import Manager
from flask_migrate import MigrateCommand

#通过函数create_app进行包括蓝本/扩展/系统配置的初始化
app = create_app('default')
manager = Manager(app)
#给manage添加迁移命令db
manager.add_command('db',MigrateCommand)

if __name__ == '__main__':
    manager.run()

extensions.py

#extensions.py
from flask_bootstrap import  Bootstrap
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_mail import Mail

from flask_login import LoginManager
from flask_uploads import UploadSet,IMAGES,patch_request_class,configure_uploads
from flask_moment import Moment
from flask_cache import Cache

#实例化db
db = SQLAlchemy()
#实例化bootstrap
bootstrap = Bootstrap()
#实例化migrate
migrate = Migrate(db=db)
#实例化邮箱
mail = Mail()
#实例化用户登录模块
login_manager = LoginManager()
#实例化文件对象
file = UploadSet('photos',IMAGES)
moment = Moment()
#simple简单缓存
# cache = Cache(config={'CACHE_TYPE':'simple'})
cache = Cache(config={'CACHE_TYPE': 'simple'})

def config_extensions(app):
    #bootstrap初始化app
    bootstrap.init_app(app)
    #db初始化app
    db.init_app(app)
    #migrate初始化app
    migrate.init_app(app=app)
    #mail初始化
    mail.init_app(app)
    #登录模块初始化
    login_manager.init_app(app)
    #moment时间模块初始化
    moment.init_app(app)
    cache.init_app(app=app)

    #需要指定登录端点
    login_manager.login_view ='user.login'
    #提示信息
    login_manager.login_message = '请登录再访问'
    #设置session保护级别
    login_manager.session_protection = 'strong'

    #配置文件上传
    configure_uploads(app,file)
    patch_request_class(app,size=None)

app.__init__.py

#app.__init__.py

from flask import Flask,render_template
from app.settings import config
from app.extensions import config_extensions
from app.views import config_blueprint

#初始化当前整个应用的函数
def create_app(config_name):
    app = Flask(__name__)
    #导入settings配置信息
    app.config.from_object(config[config_name])
    #第三方库初始化
    config_extensions(app)
    #注册所有的蓝本函数
    config_blueprint(app)
    #错误页面绑定app
    errors(app)
    return app

def errors(app):
    @app.errorhandler(404)
    def page_not_found(e):
        return render_template('errors/error.html',error=e)

    @app.errorhandler(500)
    def page_not_found(e):
        return render_template('errors/error.html', error=e)

view.__init__.py

#view.__init__.py

from .main import main
from .user import user
from .posts import posts

BluePrint = [
    (main,''),
    (user,''),
    (posts,'')
]

#封装注册蓝本的函数
def config_blueprint(app):
    #循环注册蓝本
    for blueprint,prefix in BluePrint:
        app.register_blueprint(blueprint,url_prefix=prefix)

方案二

fisher.py

from app import create_app

app = create_app()

if __name__ == '__main__':
    from werkzeug.contrib.fixers import ProxyFix
    app.wsgi_app = ProxyFix(app.wsgi_app)
    app.run()

app.__init__.py

#app.__init__.py

'''创建flask核心对象'''
from flask import Flask
from app.models.base import db
from flask_login import LoginManager
from flask_mail import Mail
from app.libs.limiter import Limiter


login_manager = LoginManager()
mail = Mail()
limiter = Limiter()


def create_app():
    '''
    系统配置与蓝图需要绑定app
    :return:
    '''
    app = Flask(__name__)
    app.config.from_object('app.secure')
    app.config.from_object('app.setting')
    register_blueprint(app)

    db.init_app(app)
    login_manager.init_app(app)
    login_manager.login_view = 'web.login'
    login_manager.login_message = '请先登录或者注册'

    mail.init_app(app)
    with app.app_context():
        db.create_all()
    return app

#注册蓝本
def register_blueprint(app):
    from app.web.book import web
    app.register_blueprint(web)

view.__init__.py

#view.__init__.py

from flask import Blueprint,render_template

'''蓝图 blueprint '''
web = Blueprint('web',__name__)  #__name__代表蓝图所在模块


@web.app_errorhandler(404)
def not_found(e):
    '''
    AOP: 处理所有的404请求
    '''
    return render_template('404.html'),404

@web.app_errorhandler(500)
def internal_server_error(e):
    '''
    AOP: 处理所有的500请求
    '''
    return render_template('500.html'),500

#在此处导入代表先初始化在导入应用
#防止循环调用的问题
from app.web import book  
from app.web import auth
from app.web import drift
from app.web import gift
from app.web import main
from app.web import wish

5 生产开发环境下的settings配置

settings配置

1 根据不同等级,配置系统设置

2 配置数据库的连接

3 配置不同环境下的生产

方案一

import os
base_path = os.path.abspath(os.path.dirname(__file__))
#配置所有环境的基类
class Config:
    SECRET_KEY = 'secret_key'
    SQLALCHEMY_COMMIT_ON_TEARDOWN = True
    SQLALCHEMY_TRACK_MODIFICATIONS = False

    MAIL_SERVER = os.environ.get('MAIL_SERVER', 'smtp.163.com')
    MAIL_USERNAME = os.environ.get('MAIL_USERNAME', 'xxx@163.com')
    MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD', 'xxx')

    #配置上传文件
    MAX_CONTENT_LENGTH = 1024*1024*64
    UPLOADED_PHOTOS_DEST = os.path.join(base_path,'static/upload')

    PAGE_NUM = 3
    

#测试
class TestingConfig(Config):
    SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:xxx@127.0.0.1:3306/blog'
#开发
class DevelopmentConfig(Config):
    SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:xxx@127.0.0.1:3306/blog'
    # SQLALCHEMY_DATABASE_URI = 'sqlite:///'+ os.path.join(base_path,'develop.sqlite')
#生产
class ProductionConfig(Config):
    SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:xxx@127.0.0.1:3306/blog'

#设置字典
config = {
    'development':DevelopmentConfig,
    'production':ProductionConfig,
    'test':TestingConfig,
    'default':DevelopmentConfig
}

方案二

settings.py

'''
配置文件: 生产环境与开发环境相同的配置,setting可以上传git
'''
PER_PAGE = 15
BEANS_UPLOAD_ONE_BOOK = 0.5
RECENT_BOOK_COUNT = 20

secure.py

'''
配置文件:保存单独的加密信息,secure不要上传git
'''
DEBUG = True

# 单数据库
SQLALCHEMY_DATABASE_URI = 'mysql+cymysql://root:xxx@xxx:3306/fisher'
SECRET_KEY = '\SAAFsdfsdf:sdadzxcsd,./.dasdafasd'

#Email相关配置
MAIL_SERVER = 'smtp.163.com'
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USE_TSL = False
MAIL_USERNAME = 'xxx@163.com'
MAIL_PASSWORD = 'xxx'

6 分页展示

def calulate_start(self,page):
        return (page-1)* current_app.config.get('PER_PAGE')

7 模型基类的设计

模型类的设置

1 创建基类继承db.Model

2 db实例化通过继承添加容错自动提交数据库功能

3 封装了快速设置属性功能

base.py

from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy, BaseQuery
from sqlalchemy import Column, Integer, SmallInteger
from contextlib import contextmanager
from datetime import datetime


class SQLAlchemy(_SQLAlchemy):
    '''
    封装了数据库的自动提交回滚
    '''
    @contextmanager
    def auto_commit(self):
        try:
            yield
            self.session.commit()
        except Exception as e:
            self.session.rollback()
            raise e


class Query(BaseQuery):
    '''
    自定义基类(继承,初始化),重写filter_by方法
    '''
    def filter_by(self, **kwargs):
        if 'status' not in kwargs.keys():
            kwargs['status'] = 1
        return super(Query, self).filter_by(**kwargs)

db = SQLAlchemy(query_class=Query)


class Base(db.Model):
    '''
    该模型表不想在数据库创建,添加__abstract__ = True不会创建该表
    '''
    __abstract__ = True
    '''类变量在类开始的时候就已经确定了'''
    create_time = Column('create_time', Integer)
    status = Column(SmallInteger, default=1)

    def __init__(self):
        '''实例变量保证创建时间的准确性'''
        self.create_time = int(datetime.now().timestamp())

    def delete(self):
        self.status = 0

    def set_attrs(self, attrs_dict):
        for key, value in attrs_dict.items():
            if hasattr(self, key) and key != 'id':
                setattr(self, key, value)
                
    @property
    def create_datetime(self):
        '''时间格式统一,将方法转换成属性调用'''
        if self.create_time:
            return datetime.fromtimestamp(self.create_time)
        else:
            return None