PyJWT 详解
1.首先,我们需要先了解 JWT 的概念,所以我们先看pyjwt的官网
2.对于官方 JWT 有两篇博文写的不错分别如下:
https://blog.csdn.net/qq_15766181/article/details/80707923
https://blog.csdn.net/u011277123/article/details/78918390
3.然后我们需要了解 python 的jwt ------- PyJWT
(1).官网
https://pyjwt.readthedocs.io/en/latest/
(2).本文完善的pyjwt demo 来源
4.开始学习 3(2) 中的大神的一个很有借鉴意义的demo
(1).项目整体框架如下
(2).auth和users中的 __init__.py 文件都为空,本文不涉及这两个文件
首先,进入项目根目录,执行
pip install pyjwt
(3).根目录的__init__.py:
from flask import Flask, request
def create_app(config_filename):
app = Flask(__name__)
app.config.from_object(config_filename)
# send CORS headers
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
if request.method == 'OPTIONS':
response.headers['Access-Control-Allow-Methods'] = 'DELETE, GET, POST, PUT'
headers = request.headers.get('Access-Control-Request-Headers')
if headers:
response.headers['Access-Control-Allow-Headers'] = headers
return response
from app.users.model import db
db.init_app(app)
from app.users.api import init_api
init_api(app)
return app
(4).auths.py:
import jwt, datetime, time
from flask import jsonify
from app.users.model import Users
from .. import config
from .. import common
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Auth():
@staticmethod
def encode_auth_token(user_id):
# 申请Token,参数为自定义,user_id不必须,此处为以后认证作准备,程序员可以根据情况自定义不同参数
"""
生成认证Token
:param user_id: int
:param login_time: int(timestamp)
:return: string
"""
try:
headers = {
"typ": "JWT",
"alg": "HS256",
"user_id": user_id
}
playload = {
"headers": headers,
"iss": 'ly',
"exp": datetime.datetime.utcnow() + datetime.timedelta(days=0, hours=0, minutes=1, seconds=0),
'iat': datetime.datetime.utcnow()
}
signature = jwt.encode(playload, config.SECRET_KEY, algorithm='HS256')
return signature
except Exception as e:
return e
# encode为加密函数,decode为解密函数(HS256)
# JWT官网的三个加密参数为
# 1.header(type,algorithm)
# {
# "alg": "HS256",
# "typ": "JWT"
# }
# 2.playload(iss,sub,aud,exp,nbf,lat,jti)
# iss: jwt签发者
# sub: jwt所面向的用户
# aud: 接收jwt的一方
# exp: jwt的过期时间,这个过期时间必须要大于签发时间
# nbf: 定义在什么时间之前,该jwt都是不可用的.
# iat: jwt的签发时间
# jti: jwt的唯一身份标识,主要用来作为一次性token,从而回避重放攻击。
# 3.signature
#
# jwt的第三部分是一个签证信息,这个签证信息由三部分组成:
#
# header (base64后的)
# payload (base64后的)
# secret
# PyJwt官网的三个加密参数为
# jwt.encode(playload, key, algorithm='HS256')
# playload 同上,key为app的 SECRET_KEY algorithm 为加密算法
# 二者应该都可以用,但我们用的是python的 pyjwt ,那就入乡随俗吧
@staticmethod
def decode_auth_token(auth_token):
"""
验证Token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(auth_token, config.SECRET_KEY, options={'verify_exp': False})
if payload:
return payload
else:
raise jwt.InvalidTokenError
except jwt.ExpiredSignatureError:
return 'Token过期'
except jwt.InvalidTokenError:
return '无效Token'
def authenticate(self, username, password):
"""
用户登录,登录成功返回token,写将登录时间写入数据库;登录失败返回失败原因
:param password:
:return: json
"""
info = Users.query.filter(username == Users.username).first()
if info is None:
return jsonify(common.falseReturn('', '找不到用户'))
else:
if info.password == password:
login_time = int(time.time())
info.login_time = login_time
db.session.commit()
token = self.encode_auth_token(info.id)
return jsonify(common.trueReturn(token.decode(), '登录成功'))
# return jsonify(common.trueReturn(jwt.decode(token, config.SECRET_KEY, algorithms='HS256'), '登录成功'))
else:
return jsonify(common.falseReturn('', '密码不正确'))
def identify(self, request):
"""
用户鉴权
:return: list
"""
try:
auth_token = jwt.decode(request.headers.get('Authorization'), config.SECRET_KEY, algorithms='HS256')
if auth_token:
if not auth_token or auth_token['headers']['typ'] != 'JWT':
result = common.falseReturn('', '请传递正确的验证头信息')
else:
user = Users.query.filter(Users.id == auth_token['headers']['user_id']).first()
if user is None:
result = common.falseReturn('', '找不到该用户信息')
else:
result = common.trueReturn(user.id, '请求成功')
return result
except jwt.ExpiredSignatureError:
result = common.falseReturn('Time_Out', 'Token已过期')
return result
except jwt.InvalidTokenError:
result = common.falseReturn('Time_Out', '未提供认证Token')
return result
(5).api.py:
from flask import jsonify, request
from app.users.model import Users
from app.auth.auths import Auth
from .. import common
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def init_api(app):
@app.route('/register', methods=['POST'])
def register():
"""
用户注册
:return: json
"""
email = request.form.get('email')
username = request.form.get('username')
password = request.form.get('password')
user = Users(id=None, username=username, password=password, email=email)
db.session.add(user)
db.session.commit()
info = Users.query.filter(Users.username == username).first()
if info:
data = {
'id': info.id,
'username': info.username,
'email': info.email,
'login_time': info.login_time
}
return jsonify(common.trueReturn(data, "用户注册成功"))
else:
return jsonify(common.falseReturn('you are the error', '用户注册失败'))
@app.route('/login', methods=['POST'])
def login():
"""
用户登录
:return: json
"""
username = request.form.get('username')
password = request.form.get('password')
if not username or not password:
return jsonify(common.falseReturn('', '用户名和密码不能为空'))
else:
return Auth.authenticate(Auth, username, password)
@app.route('/user', methods=['GET'])
def get():
"""
获取用户信息
:return: json
"""
result = Auth.identify(Auth, request)
if result['status'] and result['data']:
user = Users.query.filter(Users.id == result['data']).first()
data = {
'id': user.id,
'username': user.username,
'email': user.email,
'login_time': user.login_time
}
result = common.trueReturn(data, "请求成功")
return jsonify(result)
(6).model.py:
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Users(db.Model):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(250), unique=True, nullable=False)
username = db.Column(db.String(250), unique=True, nullable=False)
password = db.Column(db.String(250))
login_time = db.Column(db.Integer)
def __init__(self, id, username, password, email):
self.id = id
self.username = username
self.password = password
self.email = email
def __str__(self):
return "Users(id='%s')" % self.id
def set_password(self, password):
self.password = password
db.session.commit()
def check_password(self, hash, password):
if password == self.password:
return True
else:
return False
def get(self, id):
return self.query.filter_by(id=id).first()
def add(self, user):
db.session.add(user)
return session_commit()
def update(self):
return session_commit()
def session_commit():
db.session.commit()
(7).common.py:
def trueReturn(data, msg):
return {
"status": True,
"data": data,
"msg": msg
}
def falseReturn(data, msg):
return {
"status": False,
"data": data,
"msg": msg
}
(8).config.py:
DB_USER = 'root'
DB_PASSWORD = '005'
DB_HOST = 'localhost'
DB_DB = 'jwt_test'
DEBUG = True
PORT = 3306
HOST = "127.0.0.1"
SECRET_KEY = "lanyue"
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_HOST + '/' + DB_DB
(9).db.py:
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from run import app
app.config.from_object('app.config')
from app.users.model import db
db.init_app(app)
migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)
if __name__ == '__main__':
manager.run()
(10).run.py:
from app import create_app
app = create_app('app.config')
if __name__ == '__main__':
app.run(host=app.config['HOST'], port=app.config['PORT'], debug=app.config['DEBUG'])
5.调试详解:
(1).首先,我们下载火狐浏览器的 RESTClient 插件,应为这样做可以省去前台的代码(包括向后台的get post)
(2).然后,运行项目,打开火狐浏览器的 RESTClient 插件
我们先熟悉一下这个插件的使用方法
这个插件中有一个参数挺重要的,推荐给大家
https://blog.csdn.net/wh_xmy/article/details/70873370
(3).打开插件
请求方法指的是我们获取数据的方式,一般只用到 get post
网址指的是我们将数据提交的路由网址
身份认证我也不清楚,本文可以不用
表头就是设置数据类型和发送的各种状态的
正文是提交的数据,如果是表单数据需要用到正文右上角的那个工具(本例就是使用的它)
HTTP响应就是指我们针对网址发送的数据经由服务器根据我们指定的参数执行完后返回的数据和状态
因为我们先要注册用户所以用到我们的register路由(每种语言叫法不一样),点击增加HTTP头字段,key值选Content-Type,value值根据图中选好,填好网址后,点击正文右上角的那个工具出来如下界面,根据图填好内容
点击确定,点击发送出来如下界面,表示我们注册成功:
接下来我们验证登录
还是POST,把register改为login,表中数据不变,如下图:
点击发送按钮,出现如下提示表明我们成功:
把data字段对应的值复制下来,准备测试/user路由用
选择GET方法,点击增加请求头,key值为Authorization,value值为刚才复制的内容,login改为user,点击发送按钮,出来如下内容,表示我们成功,但是为什么提示Token过期啊,因为我们设置了Token的有效期为一分钟,聪明的你们能不能动作快点,来使他访问成功呢
最后,demo下载地址为:
https://download.csdn.net/download/itlanyue/10625344
下载完成后,使用 tar(bzip2) 解压缩即可