欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Django REST 框架详解 08 | 认证组件

程序员文章站 2022-04-25 15:47:52
...

一、认证组件

1. 分析源码

通过分析源码了解认证组件的方法调用过程

APIView 的 dispatch 中使用 initial 方法实现初始化并进行三大认证,第一步就是认证组件

rest_framework/views.py

class APIView(View):
    # ...
    def initial(self, request, *args, **kwargs):
        # ...
        # 认证组件:校验用户
        # 这里调用 perform_authentication 实现认证
        self.perform_authentication(request)
        # 权限组件:校验用户权限
        self.check_permissions(request)
        # 频率组件:限制视图接口被访问次数
        self.check_throttles(request)
    
    def perform_authentication(self, request):
        """
        Perform authentication on the incoming request.

        Note that if you override this and simply 'pass', then authentication
        will instead be performed lazily, the first time either
        `request.user` or `request.auth` is accessed.
        """
        # 去 request 中调用 user 方法属性
        request.user

request.user 去 request 中找 user 方法属性,找到认证方法实现过程

rest_framework/request.py

class Request:
    @property
    def user(self):
        """
        Returns the user associated with the current request, as authenticated
        by the authentication classes provided to the request.
        """
        # 属性来源是 _属性名
        if not hasattr(self, '_user'):
            # 回收错误信息
            with wrap_attributeerrors():
                # 没用户,认证处用户
                self._authenticate()
        # 有用户,直接返回用户
        return self._user
    
    # 认证方法
    def _authenticate(self):
        """
        Attempt to authenticate the request using each authentication instance
        in turn.
        """
        
        # 遍历拿到认证器,进行认证
        # self.authenticators,配置的一堆认证类产生的认证类对象组成的 list
        for authenticator in self.authenticators:
            # 该方法被 try 包裹,代表该方法会抛异常,抛异常代表认证失败
            try:
                # 认证器(对象)调用认证方法 authenticate(认证类对象self,request 请求对象)
                # 返回值:登录的用户与认证的信息组成的 tuple                
                user_auth_tuple = authenticator.authenticate(self)
            except exceptions.APIException:
                # 非法用户
                self._not_authenticated()
                raise
                
			# 处理返回值
            if user_auth_tuple is not None:
                self._authenticator = authenticator
                # 合法用户
                # 如果有返回值,就将登录用户与登录认证分别保存到 request.user request.auth
                self.user, self.auth = user_auth_tuple
                return
            
		# 游客
        # 如果返回值 user_auth_tuple,代表认证通过,但是没有登录用户和登录认证信息,代表游客
        self._not_authenticated()

寻找 authenticators 如何定义

rest_framework/views.py

class APIView(View):

    # The following policies may be set at either globally, or per-view.
    # 配置认证类
    authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
    
    def dispatch(self, request, *args, **kwargs):
        """
        `.dispatch()` is pretty much the same as Django's regular dispatch,
        but with extra hooks for startup, finalize, and exception handling.
        """
        self.args = args
        self.kwargs = kwargs
        # 请求模块(解析模块)
        request = self.initialize_request(request, *args, **kwargs)
        self.request = request
        self.headers = self.default_response_headers  # deprecate?

        try:
            # 进入三大认证模块
            self.initial(request, *args, **kwargs)

            # Get the appropriate handler method
            if request.method.lower() in self.http_method_names:
                handler = getattr(self, request.method.lower(),
                                  self.http_method_not_allowed)
            else:
                handler = self.http_method_not_allowed

            response = handler(request, *args, **kwargs)

        except Exception as exc:
            response = self.handle_exception(exc)

        self.response = self.finalize_response(request, response, *args, **kwargs)
        return self.response
    

    def initialize_request(self, request, *args, **kwargs):
        """
        Returns the initial request object.
        """
        parser_context = self.get_parser_context(request)

        return Request(
            request,
            # 获取解析类
            parsers=self.get_parsers(),
            # 获取认证器
            authenticators=self.get_authenticators(),
            negotiator=self.get_content_negotiator(),
            parser_context=parser_context
        )
		
        # 获取认证器
        def get_authenticators(self):
        """
        Instantiates and returns the list of authenticators that this view can use.
        """
        # 实例化一堆认证类对象
        return [auth() for auth in self.authentication_classes]

了解到认证器是通过一系列人证类对象实例化后定义

我们进去 SessionAuthentication 查看默认配置的认证类的实现

class SessionAuthentication(BaseAuthentication):
    """
    Use Django's session framework for authentication.
    """

    def authenticate(self, request):
        """
        Returns a `User` if the request session currently has a logged in user.
        Otherwise returns `None`.
        """

        # 得到用户
        user = getattr(request._request, 'user', None)

        # Unauthenticated, CSRF validation not required
        if not user or not user.is_active:
            # 没有解析出,代表游客
            return None
		
        # 解析出用户后,要重新启用 csrf 认证
        # 如果 csrf 认证失败,就出现异常,认证为非法用户
        self.enforce_csrf(request)

        # CSRF passed with authenticated user
        # 认证为合法用户,没有返回认证信息
        return (user, None)

    class BasicAuthentication(BaseAuthentication):
    """
    HTTP Basic authentication against username/password.
    """
    www_authenticate_realm = 'api'

    def authenticate(self, request):
        """
        Returns a `User` if a correct username and password have been supplied
        using HTTP Basic authentication.  Otherwise returns `None`.
        """
        # 获取认证信息:该认证信息是两段式(basic 认证字符串)
        auth = get_authorization_header(request).split()
		
        # 没有认证信息,认证为游客
        if not auth or auth[0].lower() != b'basic':
            return None
		
        # 有认证信息,格式错误,认证为非法用户
        if len(auth) == 1:
            msg = _('Invalid basic header. No credentials provided.')
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = _('Invalid basic header. Credentials string should not contain spaces.')
            raise exceptions.AuthenticationFailed(msg)

        try:
            auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
        except (TypeError, UnicodeDecodeError, binascii.Error):
            msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
            raise exceptions.AuthenticationFailed(msg)

        userid, password = auth_parts[0], auth_parts[2]
        # 认证信息处理出用户主键和密码,进一步得到用户对象
        return self.authenticate_credentials(userid, password, request)

    def authenticate_credentials(self, userid, password, request=None):
        """
        Authenticate the userid and password against username and password
        with optional request for context.
        """
        credentials = {
            get_user_model().USERNAME_FIELD: userid,
            'password': password
        }
        user = authenticate(request=request, **credentials)
		
        # 如果没有该用户或非活跃用户,认证为非法用户
        if user is None:
            raise exceptions.AuthenticationFailed(_('Invalid username/password.'))

        if not user.is_active:
            raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))

        return (user, None)

    def authenticate_header(self, request):
        return 'Basic realm="%s"' % self.www_authenticate_realm

我们现在查看默认认证类配置定义位置

rest_framework/setting.py

DEFAULTS = {
    # ...
    # 默认认证类配置
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.BasicAuthentication'
    ],
}

根据这个,现在就可以自定义项目配置文件

2. 全局配置认证

编写自己项目的settings.py

# 全局局部配置
REST_FRAMEWORK = {
    # ...
    # 配置默认认证类
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.BasicAuthentication'
    ],
}

二、自定义认证类

可以看到以上默认的认证类,所有的规则都是固定的。尤其是做session认证时,会调用csrf,但是对于前后端分离的情况,这种规则并不友好。

1. 代码实现

  • 继承 BasicAuthentication

  • 重写 authenticate 方法

  • 实现根据自定义认证规则,确定是否有权限

  • 认证规则:

    •   游客:无认证信息,返回 None
      
    •   非法用户:有认证信息,认证失败,抛异常
      
    •   合法用户:有认证信息,认证成功,返回元组
      
  • 进行全局或局部配置

    • 全局:配置文件 settings.py
    • 局部:在视图类 import
  • 前台在请求头携带认证信息,且默认规范用 Authorization 字段携带认证信息

authentications.py

from rest_framework.authentication import BasicAuthentication
from rest_framework.exceptions import AuthenticationFailed

from api import models


class MyAuthentication(BasicAuthentication):
    # 重新 authenticate 方法,自定义认证规则
    def authenticate(self, request):
        # 认证规则要基于条件:
        #       游客:无认证信息,返回 None
        #       非法用户:有认证信息,认证失败,抛异常
        #       合法用户:有认证信息,认证成功,返回元组
        # 前台在请求头携带认证信息,且默认规范用 Authorization 字段携带认证信息
        # 后端固定在请求对象的 META字典中 HTTP_AUTHORIZATION 获取
        auth = request.META.get('HTTP_AUTHORIZATION', None
                                )

        # 游客认证
        if auth is None:
            return None

        # 设置认证字段规则(两端式):“auth 认证字符串”
        auth_list = auth.split()

        # 校验合法还是非法用户
        if len(auth_list) != 2 or auth_list[0].lower() != 'auth':
            raise AuthenticationFailed('The authentication information is incorrect! Illegal user')

        # 合法的用户需要进一步从 auth_list[1] 解析
        # 假设一种情况:信息为 abc.123.xyz 就可以解析 admin 用户
        # 实际开发时,该逻辑一定需要校验用户
        if auth_list[1] != 'abc.123.xyz':
            raise AuthenticationFailed('User verification failed! Illegal user')

        user = models.User.objects.filter(username='baimoc').first()

        if not user:
            raise AuthenticationFailed('User data is incorrect! Illegal user')

        return (user, None)

settings.py

# 全局局部配置
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'api.authentications.MyAuthentication'
    ],
}

views.py

from rest_framework.views import APIView
from rest_framework.generics import GenericAPIView
from rest_framework.viewsets import GenericViewSet, ViewSet

from utils.response import APIResponse


class LoginView(APIView):
    def get(self, request, *args, **kwargs):
        # 如果认证通过,request.user 就一定有值
        # 游客:AnonymousUser
        # 用户:User
        return APIResponse(0, 'Login successful')

urls.py

from django.conf.urls import url
from api import views

urlpatterns = [
    url(r'^login/$', views.LoginView.as_view()),
]

2. 接口测试

Django REST 框架详解 08 | 认证组件