身份验证是将传入请求与一组标识凭据(例如请求来自的用户或其签名的令牌)相关联的机制。而后 权限 和 限制 组件决定是否拒绝这个请求。前端
简单来讲就是:json
认证肯定了你是谁后端
权限肯定你能不能访问某个接口app
限制肯定你访问某个接口的频率dom
REST framework 提供了一些开箱即用的身份验证方案,而且还容许你实现自定义方案。ide
我的 敲码log:函数
1.源码分析
# Create your models here. class UserInfo1(models.Model): id = models.AutoField(primary_key=True) # 建立一个自增的主键字段 # 建立一个varchar(64)的惟一的不为空的字段 name = models.CharField(null=False, max_length=20) pwd = models.CharField(max_length=32,default=123) class Token(models.Model): user = models.OneToOneField("UserInfo1",on_delete=models.CASCADE) token = models.CharField(max_length=128)
2.post
# 我须要一个随机字符串 def get_random_str(user): import hashlib, time ctime = str(time.time()) # 封装bytes类型一个MD5的加密对象 md5 = hashlib.md5(bytes(user, encoding="utf8")) # md5.update 是拼接的效果,随机生成md5值 md5.update(bytes(ctime, encoding="utf8")) return md5.hexdigest() # 认证、权限和限制 class LoginModelView(APIView): def post(self, request): name = request.data.get("name") pwd = request.data.get("pwd") user = models.UserInfo1.objects.filter(name=name, pwd=pwd).first() res = {"state_code": 1000, "msg": None} if user: # 返回了一个usermd5 加密的字符串 random_str = get_random_str(user.name) """ 当存在token时,则更新,不存在则建立,defaults: 是由 (field, value) 对组成的字典,用于更新对象。 返回一个由 (object, created)组成的元组, object: 是一个建立的或者是被更新的对象, created: 是一个标示是否建立了新的对象的布尔值。 """ token = models.Token.objects.update_or_create(user=user, defaults={"token": random_str}) res["token"] = random_str else: res["state_code"] = 1001 # 错误状态码 res["msg"] = "用户名或者密码错误" import json # 这是由于json.dumps 序列化时对中文默认使用的ascii编码.想输出真正的中文须要指定ensure_ascii=False: # 若是你这样的话就能把中文转成json字符串,而不是 \u4e2d\u56fd return Response(json.dumps(res,ensure_ascii=False))
update_or_create(defaults=None, **kwargs)
defaults 的值不一样则建立,相同则更新
例
Member.objects.update_or_create(defaults={'user':1}, others={'field1':1,'field2':1})
当存在user=1时,则更新,不存在则建立编码
update_or_create(defaults=None, **kwargs)
kwargs: 来更新对象或建立一个新的对象。
defaults: 是由 (field, value) 对组成的字典,用于更新对象。
返回一个由 (object, created)组成的元组,
object: 是一个建立的或者是被更新的对象,
created: 是一个标示是否建立了新的对象的布尔值。
update_or_create: 方法经过给出的kwarg
好处:
1.他能判断你是不是当前登陆用户,当你没有带认证码 向我后端发请求的时候我是不会给你数据的。
2.他每一次登陆的认证都会改变。
局部视图认证
示例1:
class TokenAuth(object): def authenticate(self, request): # 取到 request里面的 token值 totken = request.GET.get("token") token_obj = models.Token.objects.filter(token=totken).first() if not token_obj: # 抛认证字段的异常 raise exceptions.AuthenticationFailed("验证失败") else: return token_obj.user.name,token_obj.token def authenticate_header(self,request): pass
# 多条数据 class BookView(APIView): # 定义一个认证类 authentication_classes = [TokenAuth]
而没有定义 token类的依然能够访问。
# 定义全局认证,全部视图都须要认证 REST_FRAMEWORK = { "DEFAULT_AUTHENTICATION_CLASSES" : ["app01.utils.TokenAuth"] }
from app01 import models from rest_framework import exceptions class TokenAuth(object): def authenticate(self, request): # 取到 request里面的 token值 totken = request.GET.get("token") token_obj = models.Token.objects.filter(token=totken).first() if not token_obj: # 抛认证字段的异常 raise exceptions.AuthenticationFailed("验证失败") else: return token_obj.user.name, token_obj.token def authenticate_header(self, request): pass
只有VIP用户才能看的内容。
from rest_framework.permissions import BasePermission class SVIPPermission(BasePermission): message="只有超级用户才能访问" def has_permission(self,request,view): username=request.user user_type=User.objects.filter(name=username).first().user_type if user_type==3: return True # 经过权限认证 else: return False
# 封装了3层 class AuthorDetaiView(viewsets.ModelViewSet): permission_classes = [SVIPpermission,] throttle_classes = [] # 限制某个ip 每分钟访问次数不能超过20次 # authentication_classes = [TokenAuth] # queryset serializer 这两个方法必定要定义成这个否则取不到值 queryset = models.Author.objects.all() serializer_class = AuthorModelSerializers
# 自定义局部限制
import time # 自定义限制 VISIT_RECORD = {} class VisitRateThrottle(object): def __init__(self): self.history = None def allow_request(self, request, view): """ 自定义频率限制60秒内只能访问三次 """ # 获取用户IP ip = request.META.get("REMOTE_ADDR") # 获取当前时间戳 timestamp = time.time() # 若是当前访问ip没有在列表中 我就新建一个IP访问记录 if ip not in VISIT_RECORD: VISIT_RECORD[ip] = [timestamp, ] # 能够经过验证 return True # 若是列表中有值,我就取当当前ip地址 赋值给变量 history = VISIT_RECORD[ip] self.history = history # 在列表头部 插入一个时间戳 history.insert(0, timestamp) # 若是列表有值,最早插入的值小于 当前值-60 ,tiemstamp是当前时间是在增长的 while history and history[-1] < timestamp - 60: # pop 取出 最后一个条件成立的值 history.pop() # 列表中的时间戳 大于3 就返回falsse,不然经过 if len(history) > 3: return False else: return True def wait(self): # 返回给前端还剩多少时间能够访问 timestamp = time.time() # 求出还剩多少时间能够访问 return 60 - (timestamp - self.history[-1])
# 视图使用
from rest_framework.response import Response class AuthorModelView(viewsets.ModelViewSet): authentication_classes = [TokenAuth, ] permission_classes = [SVIPPermission, ] throttle_classes = [VisitRateThrottle, ] # 限制某个IP每分钟访问次数不能超过20次 queryset = Author.objects.all() serializer_class = AuthorModelSerializers # 分页 # pagination_class = MyPageNumberPagination # renderer_classes = []
# 全局使用
# 在settings.py中设置rest framework相关配置项 REST_FRAMEWORK = { "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth", ], "DEFAULT_PERMISSION_CLASSES": ["app01.utils.SVIPPermission", ], "DEFAULT_THROTTLE_CLASSES": ["app01.utils.MyThrottle", ], }
定义频率类,必须继承SimpleRateThrottle类:
from rest_framework.throttling import SimpleRateThrottle class VisitThrottle(SimpleRateThrottle):
# 必须配置scope参数 经过scope参数去settings中找频率限定的规则 scope = 'throttle'
# 必须定义 get_cache_key函数 返回用户标识的key 这里借用源码中BaseThrottle类(SimpleRateThrottle的父类)中的get_ident函数返回用户ip地址 def get_cache_key(self, request, view): return self.get_ident(request)
局部使用:
视图函数中加上
throttle_classes = [VisitThrottle,]
全局使用:settings中配置
REST_FRAMEWORK={ 'DEFAULT_THROTTLE_CLASSES':['utils.common.VisitThrottle'],
# 局部使用也要在settings中配置上 DEFAULT_THROTTLE_RATES 经过self.scope取频率规则 (一分钟访问3次) 'DEFAULT_THROTTLE_RATES':{'throttle':'3/m',} }
设置错误信息为中文:
class Course(APIView): authentication_classes = [TokenAuth, ] permission_classes = [UserPermission, ] throttle_classes = [MyThrottles,] def get(self, request): return HttpResponse('get') def post(self, request): return HttpResponse('post') # 函数名为throttled 重写Throttled类中默认的错误信息 def throttled(self, request, wait): from rest_framework.exceptions import Throttled class MyThrottled(Throttled): default_detail = '访问频繁' extra_detail_singular = '等待 {wait} second.' extra_detail_plural = '等待 {wait} seconds.' raise MyThrottled(wait)