欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

DRF认证-权限-频率

程序员文章站 2022-07-12 11:20:54
...

1、认证介绍和源码分析

1.1–认证的实现

1、写一个类,继承BaseAuthentication,重写authenticate,认证的逻辑写在里面、
	认证通过、返回两个值、一个值最终给了Request对象的user、
    认证失败、抛异常--APIException或者AuthentionFailed

1.2–源码分析

1 只有认证通过的用户才能访问指定的url地址,比如:查询课程信息,需要登录之后才能查看,没有登录,就不能查看,这时候需要用到认证组件  self.initial

1、APIView--->dispatch--->self.initial(reuqest.*args.**kwargs):
    self.perform_authentication(request)    #认证
    self.check_permissions(request)     	#权限
    self.check_throttles(request)			#频率
2'认证'self.perform_authentication(request)-->
	def perform_authentication(self,request)  
    	request.user---->def 的 Request类中查找 方法或属性user的get方法
3、Request:  user被包装成数据属性、
    @property
    def user(self):
        if not hasattr(self, '_user'):
            with wrap_attributeerrors():  #上下文管理__enter__ __exit__
                #没用户、认证用户、
                self._authenticate() **核心
            #有用户,直接返回用户
            return self._user
4、self._authenticate()----:
        def _authenticate(self):
        """
        遍历依次尝试每个身份认证请求
        self._authenticators配置的一堆认证类产生的认证类对象组成的list
        """
        for authenticator in self.authenticators:
            '''self.authenticators:						#类出初始化的时候传的
            def __init__(self, request, parsers=None, authenticators=None,
                 negotiator=None, parser_context=None):'''
            try:
            '''认证器(对象)调用认证方法authenticate(认证类对象self,request请求对象)
          	   返回值: 登录的用户与认证信息组成tuple
          	   该方法被try包裹代表该方法会抛异常、抛异常就代表失败'''
                user_auth_tuple = authenticator.authenticate(self)
            except exceptions.APIException:
                self._not_authenticated()
                raise
			#返回值的处理
            if user_auth_tuple is not None:
                self._authenticator = authenticator
            #如果有返回值,就将 登录用户 与 登录认证分别保存 request.user\request.auth
                self.user, self.auth = user_auth_tuple
                return

        self._not_authenticated()
5、drf的Request对象实例化是再什么时候?
	-再APIVIew的dispatch最上面完成的
    request = self.initialize_request(request, *args, **kwargs)
    def __init__(self, request, parsers=None, authenticators=None,
                 negotiator=None, parser_context=None):

6、APIView的get_authenticators:
    def get_authenticators(self):
        # 取出加()执行 列表里是一堆对象、视图类中配置的
        return [auth() for auth in self.authentication_classes] #列表生成式
    --authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
    
-如果我再视图类中写:authentication_classes=[类名,类名1] 会覆盖drf中的配置
-返回[对象,对象1]

2、认证、权限、频率

2.1 --认证类的使用流程

	-写一个类,继承BaseAuthentication
	-在类中写authenticate(self, request):
	-在方法中进行校验,如果校验通过,返回两个值(返回空)
	-使用认证类,在视图类上加
	authentication_classes = [LoginAuth,]
2.1.1–登录Views
class UserView(ViewSetMixin,CreateAPIView):
    queryset = models.User.objects.all()
    serializer_class_classes = []

#1、基于原生的session
@action(method=['POST'],detail=False)
def login(self,request):
    username = request.data.get('username')
    password = request.data.get('passwrod')
    user = models.User.objects.fiflter(username=username,passrod=password).first()
    request.session['username']=user.username
    request.session['id'] = user.id
    request.session.save()
    from django.contrib.sessions.backends.db import SessionStore
    if user:
        return APIResponse(msg='登录成功',token=request.session.session_key)
    else:
        return APIResponse(status=101,msg='用户名或密码错误')

    
#2、基于自己写的UserToken表版本
from rest_framework.decorators import action
@action(methon=['POST'],detail=False)
def login(self,request):
    username = request.data.get('username')
    passrod = request.data.get('password')
    user = models.User.objects.filter(username=username,password=password).first()
    token = uuid.uuid4()
    models.UserToken.objects.update_or_crate(defaults={'token':token},user=user)
    if user:
        return APIResponse(msg='登录成功',token=token)
    else:
        return APIResponse(status=101,msg='用户名或密码错误')   
2.1.2-- urls路由
from rest_framework.routers import SimpleRouter
from django.urls import path,include

router = SimpleRouter()
router.register('路由',voews.类名)
urlpatterns = [
    path('admin/',admin.site.urls),
    path('api/',include(router.urls))
]
2.1.3—认证类的编写
#1、基于session的认证类
from django.contrib.sessions.models import Session
from importlib import import_module
form django.conf import settings
class LogiAuth(BaseAuthentication):
    def authenticate(self,request):
        token = request.GET.get('token')
        '通过传入的token(session_key,取到当前key的session对象)'
        engine = import_module(settings.SESSION_ENGINE)
        self.SessionStore = engine.SessionStore
        request.session = self.SessionStore(token)
        
        Session.objects.filter(session_key=token).first()
        if request.session.get('name',None):
            return '',''
        else:
            raise AuthenticationFailed('你没有登录')

#2、基于自己写的UserToken表
class LoginAuth(BaseAuthentication):
    def authrnticate(self,request):
        token=request.GET.get('token')
        user_token = models.UserToken.objects.filter(token=token).first()
        if user_token:
            '登录了、返回两个值、第一个值给新request对象user属性、'
            '通常情况把当前登录用户返回'
            return user_token.user,''
        else:
            raise AuthenticationFailed('没有登录')
            
2.1.4–使用认证类(全局、局部)
#全局用,settings中配置(所有接口都需要认证)
REST_FRAMEWORK={
    "DEFAULT_AUTHENTICATION_CLASSES":["app01.auth.LoginAuth",]
}
#登录功能需要局部禁用,在视图类中加入
	authentication_classes = []
#只在局部使用,只在视图类中加入
authentication_classes = [loginAuth]

2.2权限类编写和使用

2.2.1–编写权限类
class MyPermission(BasePermission):
    message='没有权限'
    def has_permission(self,request,view):
        if request.user.user_type == 1:
            return True
        else:
            self.message='你是%s用户,没有权限'%request.user.get_user_type_display()
           	return False
2.2.2–权限类的使用
#局部使用(在视图类中加)
permission_classes = [MyPermission,]
#全局使用(在文件中配置)
REST_FRAMEWORK={
    "DEFAULT_PERMISSION_CLASSES":["app01.auth.MyPermission",],
}

2.3频率类的使用

2.3.1–定义一个频率类
form rest_framework.throttling import BaseThrottle,SimpleRateThrottle
class MyThrottle(SimpleRateThrottle):
    scope = 'ip_th'
    def get_cache_key(self,request,view):
        return self.get_ident(request)
2.3.2–在配置文件中配置
REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_RATES': {
        'ip_th': '5/m',  #一分钟访问5次
    },
}
2.3.3–局部使用,全局使用
# 局部用,在视图类中配置
throttle_classes = [MyThrottle,]
# 全局用,在配置文件中配置
REST_FRAMEWORK = {
    "DEFAULT_THROTTLE_CLASSES": ["app01.auth.MyThrottle", ],
    'DEFAULT_THROTTLE_RATES': {
        'ip_th': '5/m',  #一分钟访问5次
    },

}

3、自定义频率类(了解-很少使用)

class MyThrottling(BaseThrottle):
    VISIT_RECORD = {}  # 记录访问者的大字典

    def __init__(self):
        self.history = None

    def allow_request(self, request, view):
        # (1)取出访问者ip
        # (2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问,
        	#在字典里,继续往下走
        # (3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,
        	#把这种数据pop掉,这样列表中只有60s以内的访问时间,
        # (4)判断,当列表小于3,说明一分钟以内访问不足三次,
        	#把当前时间插入到列表第一个位置,返回True,顺利通过
        # (5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
        # (1)取出访问者ip
        # print(request.META)
        ip = request.META.get('REMOTE_ADDR')
        import time
        ctime = time.time()
        # (2)判断当前ip不在访问字典里,添加进去,并且直接返回True,表示第一次访问
        if ip not in self.VISIT_RECORD:
            self.VISIT_RECORD[ip] = [ctime, ]
            return True
        self.history = self.VISIT_RECORD.get(ip,[])
        # (3)循环判断当前ip的列表,有值,并且当前时间减去列表的最后一个时间大于60s,
        	#把这种数据pop掉,这样列表中只有60s以内的访问时间,
        while self.history and ctime - self.history[-1] > 60:
            self.history.pop()
        # (4)判断,当列表小于3,说明一分钟以内访问不足三次,
       		#把当前时间插入到列表第一个位置,返回True,顺利通过
        # (5)当大于等于3,说明一分钟内访问超过三次,返回False验证失败
        if len(self.history) < 3:
            self.history.insert(0, ctime)
            return True
        else:
            return False

    def wait(self):
        import time
        ctime = time.time()
        # return 60 - (ctime - self.history[-1])
        return 1