RestFramework自定制之认证和权限、限制访问频率

认证和权限

  所谓认证就是检测用户登录与否,一般与权限对应使用。网站中都是经过用户登陆后由该用户相应的角色认证以给予对应的权限。数据库

  权限是对用户对网站进行操做的限制,只有在拥有相应权限时才可对网站中某个功能进行操做。权限老是与认证相辅相成。wdjango

自定制认证规则的重点是继承内置的BaseAuthentication类,重写其authenticate()方法。api

自定制认证方式一:经过url传参进行认证缓存

from django.conf.urls import url, include from app01.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
ulrs.py
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.request import Request from rest_framework import exceptions ######伪造的数据库中存有的token########
token_list = [ 'sfsfss123kuf3j123', 'asijnfowerkkf9812', ] ######自定制的认证规则的类,必须继承BaseAuthentication#####
class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用户认证,若是验证成功后返回元组: (用户,用户Token) :param request: :return: None,表示跳过该验证; 若是跳过了全部认证,默认用户和Token和使用配置文件进行设置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN() else: self.auth = None (user,token)表示验证经过并设置用户名和Token; AuthenticationFailed异常 """ val = request.query_params.get('token') if val not in token_list: raise exceptions.AuthenticationFailed("用户认证失败") return ('登陆用户', '用户token') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """
        # 验证失败时,返回的响应头WWW-Authenticate对应的值
        pass

#####视图函数,必须继承APIView#####
class TestView(APIView): authentication_classes = [TestAuthentication, ]#中括号中写入定义了认证规则的类
    permission_classes = []#这是权限规则,下文会进行详述

#只有经过了上述的规则,才能如下执行视图函数
    def get(self, request, *args, **kwargs): print(request.user) print(request.auth) return Response('GET请求,响应内容') def post(self, request, *args, **kwargs): return Response('POST请求,响应内容') def put(self, request, *args, **kwargs): return Response('PUT请求,响应内容')
views.py

 

自定制认证方式二:经过请求头认证安全

from django.conf.urls import url, include from app01.views import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ]
ulrs.py
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.authentication import BaseAuthentication from rest_framework.request import Request from rest_framework import exceptions ######自定制的认证规则的类,必须继承BaseAuthentication#####
class TestAuthentication(BaseAuthentication): def authenticate(self, request): """ 用户认证,若是验证成功后返回元组: (用户,用户Token) :param request: :return: None,表示跳过该验证; 若是跳过了全部认证,默认用户和Token和使用配置文件进行设置 self._authenticator = None if api_settings.UNAUTHENTICATED_USER: self.user = api_settings.UNAUTHENTICATED_USER() else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = api_settings.UNAUTHENTICATED_TOKEN() else: self.auth = None (user,token)表示验证经过并设置用户名和Token; AuthenticationFailed异常 """
        import base64 auth = request.META.get('HTTP_AUTHORIZATION', b'')#获取请求头
        if auth: auth = auth.encode('utf-8')#将bytes类型编码成utf-8
        auth = auth.split() if not auth or auth[0].lower() != b'basic': raise exceptions.AuthenticationFailed('验证失败') if len(auth) != 2: raise exceptions.AuthenticationFailed('验证失败') username, part, password = base64.b64decode(auth[1]).decode('utf-8').partition(':') if username == 'Damon' and password == '123': return ('登陆用户', '用户token') else: raise exceptions.AuthenticationFailed('用户名或密码错误') def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """
        return 'Basic realm=api'

#####视图函数,必须继承APIView#####
class TestView(APIView): authentication_classes = [TestAuthentication, ]#中括号中写入定义了认证规则的类,可放入多个
    permission_classes = []#这是权限规则,下文会进行详述 #只有经过了上述的规则,才能如下执行视图函数
    def get(self, request, *args, **kwargs): print(request.user) print(request.auth) return Response('GET请求,响应内容') def post(self, request, *args, **kwargs): return Response('POST请求,响应内容') def put(self, request, *args, **kwargs): return Response('PUT请求,响应内容')
views.py

 

使用自定制认证和自定制权限app

from django.db import models class UserInfo(models.Model): username = models.CharField(max_length=32) password = models.CharField(max_length=64) token = models.CharField(max_length=64,null=True)
models.py
from django.conf.urls import url from django.contrib import admin from app02 import views as app02_view urlpatterns = [ # url(r'^admin/', admin.site.urls),
 url(r'^auth/', app02_view.AuthView.as_view()), url(r'^hosts/', app02_view.HostView.as_view()), url(r'^users/', app02_view.UserView.as_view()), url(r'^salary/', app02_view.SalaryView.as_view()), ]
View Code
from django.views import View from rest_framework.views import APIView from rest_framework.authentication import BaseAuthentication from rest_framework import exceptions from rest_framework.response import Response from app02 import models import hashlib import time #####自定制认证规则的类#####
class MyAuthentication(BaseAuthentication): def authenticate(self, request): token = request.query_params.get('token') obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj) return None def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """
        # return 'Basic realm="api"'
        pass
#####自定制权限的类#####
class MyPermission(object): message = "无权访问"
    def has_permission(self,request,view): if request.user: return True return False #####自定制管理员权限的类#####
class AdminPermission(object): message = "无权访问"
    def has_permission(self,request,view): if request.user == 'Damon':#管理员
            return True return False #####视图函数#####
class HostView(APIView): """ 匿名用户和用户都能访问 """ authentication_classes = [MyAuthentication,] permission_classes = [] def get(self,request,*args,**kwargs): # 原来request对象,django.core.handlers.wsgi.WSGIRequest
        # 如今的request对象,rest_framework.request.Request\
 self.dispatch print(request.user) # print(request.user)
        # print(request.auth)
        return Response('主机列表') class UserView(APIView): """ 普通用户能访问 """ authentication_classes = [MyAuthentication, ] permission_classes = [MyPermission,]#自定制普通用户拥有的权限
    def get(self,request,*args,**kwargs): return Response('用户列表') class SalaryView(APIView): """ 管理员用户才能访问 """ authentication_classes = [MyAuthentication, ] permission_classes = [MyPermission,AdminPermission,]#自定制普通用户和管理员拥有的权限
    def get(self,request,*args,**kwargs): self.dispatch return Response('薪资列表') def permission_denied(self, request, message=None): #若是没有经过认证,而且权限中return False,就会报下面的异常,detail为自定制的错误信息
        if request.authenticators and not request.successful_authenticator: raise exceptions.NotAuthenticated(detail='无权访问') raise exceptions.PermissionDenied(detail=message)
views.py

 

设置全局变量ide

  对于认证和权限,咱们可新建utils.py文件,将自定制的类写入该py文件中,以后在settings.py中进行配置,就可快速使用。在views.py中只需写入相应视图函数,无需关心中括号便可实现认证和权限的配置,使得views.py文件中的代码可读性更高。函数

REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None,#将匿名用户名称设置为None,默认为Anonymous
    "DEFAULT_AUTHENTICATION_CLASSES": [ "app02.utils.MyAuthentication",#配置自定制认证类的路径
 ], "DEFAULT_PERMISSION_CLASSES": [ "app02.utils.MyPermission",#配置自定制权限类的路径
        "app02.utils.AdminPermission",#配置自定制权限类的路径
 ], }
settings.py

 

限制访问频率

  建网站的宗旨是为人民服务,供人民访问。但总有刁民想害朕,如利用机器人爬虫肆意爬取数据、侵占流量缓存、洪水攻击等等。因此咱们须要对能够访问网站的用户进行相应的访问频率的限制,以保护咱们网站的安全。源码分析

  访问的用户有两种——登陆用户、匿名用户。对于登陆用户咱们可采用惟一标识进行标记,而匿名用户咱们一般采用IP对其进行标记。post

a、基于用户IP限制访问频率

流程分析:

  • 首先获取用户信息,若是是匿名用户,获取IP。若是不是匿名用户获取其用户名。
  • 在request里面获取匿名用户IP(也有多是代理的IP),如IP= 127.1.1.1。
  • 将获取到的IP添加到到recode字典里面,须要在添加以前先限制一下。
  • 若是时间间隔大于60秒(可自定制时长),说明村吃的该用户最先一次访问的时间久远、已失效,将该次访问的时间pop。在timelist列表里如今留的是有效的访问时间段。
  • 判断该IP访问次数是否超过10次,若是超过return False予以限制。

具体实现:重点是继承BaseThrottle类,重写其allow_request()和wait()方法

from django.shortcuts import render from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework import exceptions RECORD = { } #####自定制对访问频率限制的类#####
class MyThrottle(BaseThrottle): def allow_request(self,request,view): """ # 返回False,限制 # 返回True,通行 :param request: :param view: :return: """
        """ 对匿名用户进行限制:每一个用户1分钟容许访问10次 - 获取用户IP request 1.1.1.1 """
        import time ctime = time.time() ip = self.get_ident() if ip not in RECORD: RECORD[ip] = [ctime,]#该IP是首次访问,存储当前访问时间
        else: # [4507862389234,3507862389234,2507862389234,1507862389234,],原先存有的访问时间
            time_list = RECORD[ip] while True: val = time_list[-1]#取出最先访问的时间
                if (ctime-60) > val:#检测此次访问时间与最先访问时间间隔是否超过一分钟
                    time_list.pop()#是,则更新列表中最先的访问时间(确保列表内的访问时间与这次访问的时间间隔在一分钟内)
                else: break#否,则表示在一分钟内有屡次访问
            if len(time_list) > 10:#检测一分钟访问次数是否超过十次则限制
                return False#是则限制
            time_list.insert(0,ctime)#不超过十次则将当前时间存入列表
        return True def wait(self): import time ctime = time.time() first_in_time = RECORD[self.get_ident()][-1] wt = 60 - (ctime - first_in_time)#动态显示须要等待的时间60-(当前时间 - 最近一次访问时间)
        return wt #####视图函数#####
class LimitView(APIView): authentication_classes = [] permission_classes = [] throttle_classes=[MyThrottle,] def get(self,request,*args,**kwargs): self.dispatch return Response('控制访问频率示例') def throttled(self, request, wait): """ If request is throttled, determine what kind of exception to raise. """
        #可自定制该方法设置中文的错误提示信息
        class MyThrottled(exceptions.Throttled): default_detail = '请求被限制.' extra_detail_singular = 'Expected available in {wait} second.' extra_detail_plural = '还须要再等待{wait}秒'

        raise MyThrottled(wait)
views.py

 

b. 使用配置文件,基于用户IP限制访问频率(利于Django缓存)

源码分析

class SimpleRateThrottle(BaseThrottle): """ 一个简单的缓存实现,只须要` get_cache_key() `。被覆盖。 速率(请求/秒)是由视图上的“速率”属性设置的。类。该属性是一个字符串的形式number_of_requests /期。 周期应该是:(的),“秒”,“M”,“min”,“h”,“小时”,“D”,“一天”。 之前用于节流的请求信息存储在高速缓存中。 A simple cache implementation, that only requires `.get_cache_key()` to be overridden. The rate (requests / seconds) is set by a `throttle` attribute on the View class. The attribute is a string of the form 'number_of_requests/period'. Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day') Previous request information used for throttling is stored in the cache. """ cache = default_cache timer = time.time cache_format = 'throttle_%(scope)s_%(ident)s' scope = None THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES def __init__(self): if not getattr(self, 'rate', None): self.rate = self.get_rate()#点进去看到须要些一个scope ,2/m
        self.num_requests, self.duration = self.parse_rate(self.rate) def get_cache_key(self, request, view):#这个至关因而一个半成品,咱们能够来补充它
        """ Should return a unique cache-key which can be used for throttling. Must be overridden. May return `None` if the request should not be throttled. """
        raise NotImplementedError('.get_cache_key() must be overridden') def get_rate(self): """ Determine the string representation of the allowed request rate. """
        if not getattr(self, 'scope', None):#检测必须有scope,没有就报错了
            msg = ("You must set either `.scope` or `.rate` for '%s' throttle" % self.__class__.__name__) raise ImproperlyConfigured(msg) try: return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for '%s' scope" % self.scope raise ImproperlyConfigured(msg) def parse_rate(self, rate): """ Given the request rate string, return a two tuple of: <allowed number of requests>, <period of time in seconds> """
        if rate is None: return (None, None) num, period = rate.split('/')#取配置文件并切分 'wdp':'2/minute'
        num_requests = int(num)#2
        duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]#用于配置文件的信息minute取m,即60秒
        return (num_requests, duration) # 二、执行完构造方法后执行,
    def allow_request(self, request, view): """ Implement the check to see if the request should be throttled. On success calls `throttle_success`. On failure calls `throttle_failure`. """
        if self.rate is None: return True self.key = self.get_cache_key(request, view)#三、执行get_cache_key
        if self.key is None: return True#不限制
 self.history = self.cache.get(self.key, [])#四、获得的key,默认是一个列表,赋值给了self.history,
                                                       # self.history能够理解为每个ip对应的访问记录
        self.now = self.timer() # Drop any requests from the history which have now passed the
        # throttle duration
        while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() if len(self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success() def throttle_success(self): """ Inserts the current request's timestamp along with the key into the cache. """ self.history.insert(0, self.now) self.cache.set(self.key, self.history, self.duration) return True def throttle_failure(self): """ Called when a request to the API has failed due to throttling. """
        return False def wait(self): """ Returns the recommended next request time in seconds. """
        if self.history: remaining_duration = self.duration - (self.now - self.history[-1]) else: remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1
        if available_requests <= 0: return None return remaining_duration / float(available_requests)
源码SimpleRateThrottle

自定制

settings.py中:

REST_FRAMEWORK = { 'DEFAULT_THROTTLE_RATES': { 'test_scope': '10/m',#自定制限制的时间 一分钟10次
 }, }
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework import exceptions class MySimpleRateThrottle(SimpleRateThrottle): scope = "test_scope"#scope不可改,字符串需与配置文件相同

    def get_cache_key(self, request, view): return self.get_ident(request) class LimitView(APIView): authentication_classes = [] permission_classes = [] throttle_classes=[MySimpleRateThrottle,] def get(self,request,*args,**kwargs): # self.dispatch
        return Response('控制访问频率示例') def throttled(self, request, wait): """ If request is throttled, determine what kind of exception to raise. """
        #可自定制该方法设置中文的错误提示信息
        class MyThrottled(exceptions.Throttled): default_detail = '请求被限制.' extra_detail_singular = 'Expected available in {wait} second.' extra_detail_plural = '还须要再等待{wait}秒'
views.py

 

c.对不一样用户进行不一样的限流操做

  对匿名用户每一个用户1分钟容许访问5次,对于登陆的普通用户1分钟访问10次,VIP用户一分钟访问20次。

操做流程:

  • 首页能够匿名访问
  • 先认证,只有认证了才知道是否是匿名的,
  • 权限登陆成功以后才能访问, index页面无需权限便可访问
  • 限流在配置文件中

settings.py 中进行配置

REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, 'DEFAULT_THROTTLE_RATES': {#自定制键值对
        'obj_anon': '10/m',#匿名用户
        'obj_user': '20/m',#登陆用户
        'obj_VIPuser':'20/minute',#VIP用户
 }, }
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import BaseThrottle,SimpleRateThrottle from rest_framework.authentication import BaseAuthentication from rest_framework import exceptions from app02 import models class MyAuthentication(BaseAuthentication): #检测用户是否登陆
    def authenticate(self, request): token = request.query_params.get('token')#登陆用户有tocken字段
        obj = models.UserInfo.objects.filter(token=token).first() if obj: return (obj.username,obj) return None#未登陆用户不处理

    def authenticate_header(self, request): pass

class MyPermission(object): message = "无权访问"
    def has_permission(self,request,view): if request.user: return True  #True表示有权限
        return False  #False表示无权限

class AdminPermission(object): message = "无权访问"
    def has_permission(self,request,view): if request.user == 'DamonVIP':#VIP用户
            return True return False ######对匿名用户进行限流的类#####
class AnonThrottle(SimpleRateThrottle): scope = "obj_anon"

    def get_cache_key(self, request, view): # 返回None,表示我不限制
        # 登陆用户我无论
        if request.user: return None # 匿名用户
        return self.get_ident(request) ######对登陆用户进行限流的类#####
class UserThrottle(SimpleRateThrottle): scope = "obj_user"

    def get_cache_key(self, request, view): # 登陆用户
        if request.user: return request.user # 匿名用户我无论
        return None ######对VIP用户进行限流的类#####
class VIPUserThrottle(SimpleRateThrottle): scope = "obj_VIPuser"

    def get_cache_key(self, request, view): # VIP用户
        if request.user=='DamonVIP':#VIP用户
            return request.user # 匿名用户我无论
        return None #####视图函数##### # 首页无需登陆就能够访问
class IndexView(APIView): authentication_classes = [MyAuthentication,]#认证判断他是否是匿名用户
    permission_classes = []  #主页无需权限验证
    throttle_classes=[AnonThrottle,UserThrottle,VIPUserThrottle]#对匿名用户和普通用户的访问限制
    def get(self,request,*args,**kwargs): # self.dispatch
        return Response('访问首页') def throttled(self, request, wait): '''可定制方法设置中文错误'''
      # raise exceptions.Throttled(wait)
        class MyThrottle(exceptions.Throttled): default_detail = '请求被限制' extra_detail_singular = 'Expected available in {wait} second.' extra_detail_plural = 'Expected available in {wait} seconds.' default_code = '还须要再等{wait}秒'
        raise MyThrottle(wait) # 需登陆就能够访问
class ManageView(APIView): authentication_classes = [MyAuthentication,] permission_classes = [MyPermission,] throttle_classes=[AnonThrottle,UserThrottle,VIPUserThrottle] def get(self,request,*args,**kwargs): # self.dispatch
        return Response('访问首页') # 需登陆就能够访问
class SalaryView(APIView): authentication_classes = [MyAuthentication,] permission_classes = [MyPermission,] throttle_classes=[AnonThrottle,UserThrottle,VIPUserThrottle] def get(self,request,*args,**kwargs): # self.dispatch
        return Response('访问首页')
views.py

 

设置全局变量

  与认证和权限类似,限流操做也可新建utils.py文件,将自定制的类写入该py文件中,以后在settings.py中进行配置,就可快速使用。在views.py中只需写入相应视图函数,无需关心中括号便可实现认证和权限的配置,使得views.py文件中的代码可读性更高。

REST_FRAMEWORK = { 'DEFAULT_THROTTLE_CLASSES': [ 'app04.utils.throttles.throttles.MyAnonRateThrottle', 'app04.utils.throttles.throttles.MyUserRateThrottle', ], 'DEFAULT_THROTTLE_RATES': { 'anon': '10/day', 'user': '10/day', 'My_anon': '10/m', 'My_user': '20/m', 'My_VIPuser': '50/m', }, }
View Code
相关文章
相关标签/搜索