class User(models.Model): name = models.CharField(max_length=32) pwd = models.CharField(max_length=32) class Token(models.Model): user = models.OneToOneField("User", on_delete=models.CASCADE) token = models.CharField(max_length=128) def __str__(self): return self.token
添加后执行数据库迁移,添加app01_user和app01_token表。python
urlpatterns = [ ... re_path(r'^login/$', views.LoginView.as_view(), name="login"), ]
def get_random_str(user): """ 生成随机字符串 """ import hashlib, time ctime = str(time.time()) md5 = hashlib.md5(bytes(user, encoding='utf-8')) md5.update(bytes(ctime, encoding="utf-8")) return md5.hexdigest() class LoginView(APIView): def post(self, request): # 验证逻辑:获取用户名密码与数据库比对 name = request.data.get("name") pwd = request.data.get("pwd") user = User.objects.filter(name=name, pwd=pwd).first() res = {"state_code": 1000, "msg": None} # 成功或失败须要返回的字典标识此次的状态 if user: # 经过校验 拿到随机字符串交给这我的 random_str = get_random_str(user.name) # 获取随机字符串 token = Token.objects.update_or_create(user=user, defaults={"token": random_str}) res["token"] = str(token) # json不能序列化对象,所以转为字符串 else: # 校验失败 res["state_code"]=1001 # 错误状态码 res["msg"] = "用户名或密码错误" return Response(json.dumps(res))
注意:数据库
class LoginView(APIView): def post(self, request): # 验证逻辑:获取用户名密码与数据库比对 name = request.data.get("name") pwd = request.data.get("pwd") user = User.objects.filter(name=name, pwd=pwd).first() if user: # 经过校验 拿到随机字符串交给这我的 pass else: # 校验失败 pass return Response("login....")
def get_random_str(user): """ 生成随机字符串 """ import hashlib, time ctime = str(time.time()) md5 = hashlib.md5(bytes(user, encoding='utf-8')) md5.update(bytes(ctime, encoding="utf-8")) return md5.hexdigest()
注意:ctime=str(time.time()) ,世界上一直在变化的就是时间变量,所以ctime这个变量每一个都是不一样的。django
hashlib.md5()构建md5对象。实例化md5时传递参数叫作加盐:json
md5 = hashlib.md5(bytes(user, encoding='utf-8'))
md5.update()写入要加密的字节:(这里的md5是实例化出来的对象)api
md5.update(bytes(ctime, encoding="utf-8"))
md5_obj.hexdigest():获取密文数组
return md5.hexdigest()
加盐以后,即便要加密的数据彻底同样,可是用户名确定不同,所以产生的密文必定惟一。浏览器
Token.objects.update_or_create(user=user,defaults={"token":random_str})
用给定的**kwargs值查找对象(这里是user=user),若是defaults不为None则用defaults的值{"token":random_str}更新对象;若是为None则建立一个新对象。服务器
class QuerySet: def update_or_create(self, defaults=None, **kwargs): """ Look up an object with the given kwargs, updating one with defaults if it exists, otherwise create a new one. Return a tuple (object, created), where created is a boolean specifying whether an object was created. """ defaults = defaults or {}
若是是create操做返回值是添加的数据,若是是update操做返回值是更新的条数。app
from rest_framework import exceptions class TokenAuth(object): # 这个类名能够任意取 def authenticate(self, request): # 这个方法名不可变更 token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("验证失败!") # 若是有值 return两个值中间加逗号,就构成了一个元组 return token_obj.user.name, token_obj.token # 元组:(关联用户对象的名字,当前登陆对象token) def authenticate_header(self, request): # 不加会报错,要求要传入两个参数 pass class BookView(APIView): authentication_classes = [TokenAuth, ] def get(self, request): book_list = Book.objects.all() bs = BookModelSerializers(book_list, many=True, context={"request": request}) # 序列化结果 # return HttpResponse(bs.data) return Response(bs.data) def post(self, request): # POST请求的数据 bs = BookModelSerializers(data=request.data) if bs.is_valid(): # 验证数据是否合格 print(bs.validated_data) bs.save() # create方法 return Response(bs.data) # 当前添加的数据 else: return Response(bs.errors)
每次请求都要执行dispatch.dom
在用户访问时执行APIView的dispatch方法,在dispatch进行分发操做前,须要先执行self.initial(request, *args, **kwargs),执行认证、权限、频率操做。
def dispatch(self, request, *args, **kwargs): self.args = args self.kwargs = kwargs request = self.initialize_request(request, *args, **kwargs) # 构建新request self.request = request self.headers = self.default_response_headers # deprecate? try: self.initial(request, *args, **kwargs) # 认证权限频率 # Get the appropriate handler method if request.method.lower() in self.http_method_names: handler = getattr(self, request.method.lower(), self.http_method_not_allowed) else: handler = self.http_method_not_allowed response = handler(request, *args, **kwargs) except Exception as exc: response = self.handle_exception(exc) self.response = self.finalize_response(request, response, *args, **kwargs) return self.response
def initial(self, request, *args, **kwargs): """代码省略""" # Ensure that the incoming request is permitted # 认证组件 self.perform_authentication(request) # 权限组件 self.check_permissions(request) # 频率组件 self.check_throttles(request)
def perform_authentication(self, request): request.user
因为在dispatch函数中,self.initial(request, *args, **kwargs)晚于request = self.initialize_request(request, *args, **kwargs)。所以这里的request是新构建的request。request.user即须要去Request类中寻找user静态方法。
这个新request经过initialize_request方法返回Request类对象:
def initialize_request(self, request, *args, **kwargs): parser_context = self.get_parser_context(request) return Request( request, parsers=self.get_parsers(), authenticators=self.get_authenticators(), negotiator=self.get_content_negotiator(), parser_context=parser_context )
@property def user(self): """ Returns the user associated with the current request, as authenticated by the authentication classes provided to the request.
当request已经经过认证类提供的认证,返回当前请求关联的用户 """ if not hasattr(self, '_user'): with wrap_attributeerrors(): self._authenticate() return self._user
注意:@property
装饰器就是负责把一个方法变成属性调用。未经过验证的须要用self._authenticate()方法来进行校验。
class Request(object): def __init__(self, request, parsers=None, authenticators=None, negotiator=None, parser_context=None): self._request = request self.parsers = parsers or () self.authenticators = authenticators or () # 若是是None返回空元组,若是有值返回authenticators """ print(3 and 0) # 0 print(0 and 2) # 0 print(0 or 1) # 1 print(4 or 1) # 4 """ def _authenticate(self): """ Attempt to authenticate the request using each authentication instance in turn. """ for authenticator in self.authenticators: try: user_auth_tuple = authenticator.authenticate(self) except exceptions.APIException: self._not_authenticated() raise if user_auth_tuple is not None: self._authenticator = authenticator self.user, self.auth = user_auth_tuple return self._not_authenticated()
确认authenticators的来源是Request实例化时传入的,
找到Request实例化的方法:initialize_request
class APIView(View): def initialize_request(self, request, *args, **kwargs): return Request( authenticators=self.get_authenticators(), )
再由此找到get_authenticators方法:
class APIView(View): def get_authenticators(self): """ Instantiates and returns the list of authenticators that this view can use. """ return [auth() for auth in self.authentication_classes]
authentication_classes就是咱们在视图函数中定义的列表:
class BookView(APIView): authentication_classes = [TokenAuth, ] def get(self, request):... def post(self, request):...
[auth() for auth in self.authentication_classes]这个语句的含义:循环每个认证类并进行实例化,放在数组中。以[TokenAuth, ]为例返回值是[TokenAuth(), ]。
所以回传回去authenticators=[TokenAuth(), ]。
class Request(object): def __init__(self, request, parsers=None, authenticators=None, negotiator=None, parser_context=None): self._request = request self.parsers = parsers or () self.authenticators = authenticators or () # 若是是None返回空元组,若是有值返回authenticators """ print(3 and 0) # 0 print(0 and 2) # 0 print(0 or 1) # 1 print(4 or 1) # 4 """ def _authenticate(self): """ Attempt to authenticate the request using each authentication instance in turn. """ for authenticator in self.authenticators: # [TokenAuth(), ], authenticator:TokenAuth() try: user_auth_tuple = authenticator.authenticate(self) # TokenAuth必须有authenticate方法 # authenticator.authenticate(self):是一个实例对象调用本身的实例方法,本不须要传self,这里必定是传的一个形参。 # 这个方法是在Request类中,追溯调用关系可知,这里的self是一个新request对象 except exceptions.APIException: # 抛出错误 self._not_authenticated() # 没有验证成功 raise if user_auth_tuple is not None: # 若是user_auth_tuple有值 self._authenticator = authenticator self.user, self.auth = user_auth_tuple # 将元组的值赋给self.user和self.auth return self._not_authenticated() # 没有验证成功
authenticator.authenticate(self):是一个实例对象调用本身的实例方法,本不须要传self,这里必定是传的一个形参。这个方法是在Request类中,追溯调用关系可知,这里的self是一个新request对象。所以在构建自定义的TokenAuth时必定要在def authenticate(self, request): 添加request参数。
访问时添加数据库查到的token信息,验证经过:
打印以前在_authenticate将元组的值赋给self.user和self.auth的值:
class BookView(APIView): authentication_classes = [TokenAuth, ] def get(self, request): print(request.user) # egon print(request.auth) # 02aee930be6011068e24f68935d52b02
能够看到正好对应authoerticate函数的返回值:return token_obj.user.name, token_obj.token.
from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication class TokenAuth(BaseAuthentication): def authenticate(self, request): token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("验证失败!") # 若是有值 return两个值中间加逗号,就构成了一个元组 return token_obj.user.name, token_obj.token # 元组:(关联用户对象的名字,当前登陆对象token) # def authenticate_header(self, request): # 不加会报错,且要求要传入两个参数 # pass
BaseAuthentication包含authenticate和authenticate_header函数。默认用来被覆盖。
若是没有在局部定义authentication_classes=[TokenAuth, ]。回溯查找默认的authentication_classes。
class APIView(View): authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
api_settings.DEFAULT_AUTHENTICATION_CLASSES是类的实例对象.属性的模式。当调用不存在的属性时,Python会试图调用__getattr__(self,attr)来获取属性,而且返回attr。
该语句在rest_framework/settings.py中。发现api_settings是一个实例对象。实例化时执行相应的init方法。
api_settings = APISettings(None, DEFAULTS, IMPORT_STRINGS)
DEFAULTS这个值是一个字典,每个键后面都是跟着一个元组,保存了关于rest_framework全部默认配置。也定义在rest_framework/settings.py文件中。
class APISettings(object): def __init__(self, user_settings=None, defaults=None, import_strings=None): if user_settings: self._user_settings = self.__check_user_settings(user_settings) self.defaults = defaults or DEFAULTS self.import_strings = import_strings or IMPORT_STRINGS self._cached_attrs = set()
user_settings默认为None,若是有值拿到user_settings.
self.defaults = defaults or DEFAULTS 拿到DEFAULTS字典值。
class APISettings(object): def __getattr__(self, attr): if attr not in self.defaults: raise AttributeError("Invalid API setting: '%s'" % attr) try: # Check if present in user settings val = self.user_settings[attr] except KeyError: # 当self.user_settings的值是一个空字典,取值报错KeyError # Fall back to defaults val = self.defaults[attr] # 异常处理去取默认的DEFAULT值 # Coerce import strings into classes if attr in self.import_strings: val = perform_import(val, attr) # Cache the result self._cached_attrs.add(attr) setattr(self, attr, val) return val
__getattr__是python里的一个内建函数,能够很方便地动态返回一个属性;当调用不存在的属性时,Python会试图调用__getattr__(self,item)来获取属性,而且返回item;
class Person(object): def __init__(self, name): self.name = name def __getattr__(self, item): print("item", item) def dream(self): print("dreaming。。。。。") alex = Person("alex") alex.yuan # 打印:item yuan
val = self.user_settings[attr]:user_settings执行的返回值后面加上[attr]
class APISettings(object): @property def user_settings(self): # 静态方法 if not hasattr(self, '_user_settings'): self._user_settings = getattr(settings, 'REST_FRAMEWORK', {}) return self._user_settings
这里的settings指的是restDemo项目中的restDemo/settings.py。
所以getattr(settings, 'REST_FRAMEWORK', {}) 表明的意思是:去settings.py中去拿REST_FRAMEWORK变量,若是拿不到则取一个空字典。所以self._user_settings必定是一个字典。
所以self.user_settings[attr]是在字典中取键attr的值.当字典为空时,取不到值会抛出Keyerror错误,进行异常处理去取默认的DEFAULT字典中的值:
DEFAULTS = { """省略代码""" 'DEFAULT_AUTHENTICATION_CLASSES': ( 'rest_framework.authentication.SessionAuthentication', 'rest_framework.authentication.BasicAuthentication' ), """省略代码"""
REST_FRAMEWORK = { # 仿照DEFAULT配置认证类路径 "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"] }
键必须是DEFAULT_AUTHENTICATION_CLASSES
因为要指定认证类路径所以要把以前写的TokenAuth从views.py迁移到一个新文件(文件名自定义)中,这里是:/app01/utils.py
from .models import * from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication class TokenAuth(BaseAuthentication): def authenticate(self, request): token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("验证失败!") # 若是有值 return两个值中间加逗号,就构成了一个元组 return token_obj.user.name, token_obj.token # 元组:(关联用户对象的名字,当前登陆对象token)
class LoginView(APIView): authentication_classes = [] # 配置为空,优先级高于全局认证,不用进行认证 def post(self, request):....
class APIView(View): def dispatch(self, request, *args, **kwargs): self.args = args self.kwargs = kwargs request = self.initialize_request(request, *args, **kwargs) self.request = request self.headers = self.default_response_headers # deprecate? try: self.initial(request, *args, **kwargs) """代码省略"""
在dispatch中使用的initial()方法,包含了权限组件:
def initial(self, request, *args, **kwargs): """代码省略""" # Ensure that the incoming request is permitted # 认证组件 self.perform_authentication(request) # 权限组件 self.check_permissions(request) # 频率组件 self.check_throttles(request)
class APIView(View): def check_permissions(self, request): """ Check if the request should be permitted. Raises an appropriate exception if the request is not permitted. """ for permission in self.get_permissions(): # 循环的是[SVIPPermission(), ] permission是SVIPPermission()——权限实例 if not permission.has_permission(request, self): # 因而可知权限类必须有has_permission方法
# 经过权限认证什么都不用作,不经过执行如下代码 self.permission_denied( request, message=getattr(permission, 'message', None) )
这里循环的是self.get_permissions():
1)查看get_permissions方法:
class APIView(View): def get_permissions(self): """ Instantiates and returns the list of permissions that this view requires. """ return [permission() for permission in self.permission_classes]
[permission() for permission in self.permission_classes]与认证组件彻底相似:循环每个权限类并进行实例化,放在数组中。
2)查看确认permission_classes的值:
class APIView(View):
# The following policies may be set at either globally, or per-view.
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
若是在本身的视图类中定义了permission_classes(优先取当前视图的局部配置) ,说明配置了局部视图权限,就取本身定义的。
若是没有定义:
因为权限组件依然是用了api_settings这个APISettings类实例对象,实例化时执行__init__函数,所以也执行了user_settings静态方法。
class APISettings(object): def __init__(self, user_settings=None, defaults=None, import_strings=None): if user_settings: self._user_settings = self.__check_user_settings(user_settings) self.defaults = defaults or DEFAULTS def user_settings(self): if not hasattr(self, '_user_settings'): self._user_settings = getattr(settings, 'REST_FRAMEWORK', {}) # 去settings.py中去拿REST_FRAMEWORK变量,若是拿不到则取一个空字典 return self._user_settings
这里user_settings函数经过反射取settings下是否有配置‘REST_FRAMEWORK’,若是有配置即说明配置了全局视图权限。self._user_settings取配置的值。
getattr(settings, 'REST_FRAMEWORK', {})里的settings是从django引入过来的:
from django.conf import settings
若是没有配置则self._user_settings是一个空字典,须要进一步分析api_settings.DEFAULT_PERMISSION_CLASSES:
APISettings类中包含__getattr__方法,当调用不存在的属性时,Python会试图调用__getattr__(self,item)来获取属性,而且返回item:
class APISettings(object): def __getattr__(self, attr): if attr not in self.defaults: raise AttributeError("Invalid API setting: '%s'" % attr) try: # Check if present in user settings val = self.user_settings[attr] # 全局权限视图未设置时self.user_settings返回值是一个空字典,设置时取到全局配置 except KeyError: # 空字典取值报错抛出异常 # Fall back to defaults val = self.defaults[attr] # 获取DEFAULT中默认值 # Coerce import strings into classes if attr in self.import_strings: val = perform_import(val, attr) # Cache the result self._cached_attrs.add(attr) setattr(self, attr, val) return val
因为全局权限视图未未设置时self.user_settings返回值是一个空字典,所以取值会失败,经过异常处理获取DEFAULT默认配置中的值。
3)总结
self.get_permissions()的返回值是权限实例列表,如下面的示例为例是[SVIPPermission(), ]。所以for循环拿到的permission是一个个权限实例:SVIPPermission()
所以若是permission.has_permission返回值是true,直接完成权限验证;若是返回值是False,则返回没有权限的提示。
修改models.py中user表结构:
class User(models.Model): name = models.CharField(max_length=32) pwd = models.CharField(max_length=32) type_choice = ((1, "普通用户"), (2, "VIP"), (3, "SVIP")) user_type = models.IntegerField(choices=type_choice, default=1)
修改后完成数据库迁移。
from rest_framework import viewsets class SVIPPermission(object): # 超级用户可用 def has_permission(self, request, view): username = request.user # 获取前面认证过的信息:egon user_type = User.objects.filter(name=username).first().user_type if user_type == 3: # 经过权限认证 return True else: # 未经过权限认证 return False class AuthorViewSet(viewsets.ModelViewSet): # authentication_classes = [TokenAuth, ] permission_classes = [SVIPPermission, ] queryset = Author.objects.all() # 配置queryset:告知这个类此次处理的数据 serializer_class = AuthorModelSerializers # 告知处理用到的序列化组件
显示效果:
class SVIPPermission(object): # 超级用户可用 message = "只有超级用户才能访问" def has_permission(self, request, view): username = request.user # 获取前面认证过的信息:egon user_type = User.objects.filter(name=username).first().user_type if user_type == 3: # 经过权限认证 return True else: # 未经过权限认证 return False
显示效果:
from .models import * from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication class TokenAuth(BaseAuthentication): def authenticate(self, request): token = request.GET.get("token") token_obj = Token.objects.filter(token=token).first() if not token_obj: raise exceptions.AuthenticationFailed("验证失败!") # 若是有值 return两个值中间加逗号,就构成了一个元组 return token_obj.user.name, token_obj.token # 元组:(关联用户对象的名字,当前登陆对象token) class SVIPPermission(object): # 超级用户可用 message = "只有超级用户才能访问" def has_permission(self, request, view): username = request.user # 获取前面认证过的信息:egon user_type = User.objects.filter(name=username).first().user_type if user_type == 3: # 经过权限认证 return True else: # 未经过权限认证 return False
REST_FRAMEWORK = { # 认证类路径 "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"], # 权限类路径 "DEFAULT_PERMISSION_CLASSES": ["app01.utils.SVIPPermission"] }
class APIView(View): def dispatch(self, request, *args, **kwargs): try: self.initial(request, *args, **kwargs) def initial(self, request, *args, **kwargs): self.check_throttles(request) def check_throttles(self, request): """ Check if request should be throttled. Raises an appropriate exception if the request is throttled. """ for throttle in self.get_throttles(): if not throttle.allow_request(request, self): self.throttled(request, throttle.wait()) def get_throttles(self): return [throttle() for throttle in self.throttle_classes] # 频率对象列表
直接将频率限制类定义在utils.py中:
from rest_framework.throttling import BaseThrottle VISIT_RECORD={} class VisitThrottle(BaseThrottle): def __init__(self): self.history=None def allow_request(self,request,view): # 要求访问站点的频率不能超过1分钟3次 remote_addr = request.META.get('REMOTE_ADDR') print(remote_addr) import time ctime=time.time() if remote_addr not in VISIT_RECORD: # 未访问过 VISIT_RECORD[remote_addr]=[ctime,] return True history=VISIT_RECORD.get(remote_addr) self.history=history while history and history[-1]<ctime-60: history.pop() if len(history) < 3: # 未达到频率限制 history.insert(0,ctime) return True else: return False def wait(self): import time ctime=time.time() return 60-(ctime-self.history[-1])
在视图类中中配置局部频率限制:
class AuthorViewSet(viewsets.ModelViewSet): # authentication_classes = [TokenAuth, ] # permission_classes = [SVIPPermission, ] throttle_classes = [VisitThrottle, ] queryset = Author.objects.all() # 配置queryset:告知这个类此次处理的数据 serializer_class = AuthorModelSerializers # 告知处理用到的序列化组件
注意:
request.META 是一个Python字典,包含了全部本次HTTP请求的Header信息,好比用户IP地址和用户Agent(一般是浏览器的名称和版本号)。 注意,Header信息的完整列表取决于用户所发送的Header信息和服务器端设置的Header信息。
在restDemo/settings.py中配置:
REST_FRAMEWORK = { # 认证类路径 "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"], # 权限类路径 "DEFAULT_PERMISSION_CLASSES": ["app01.utils.SVIPPermission"], # 频率类路径 "DEFAULT_THROTTLE_CLASSES": ["app01.utils.VisitThrottle"] }
将频率配置类修改成:
class VisitThrottle(SimpleRateThrottle): scope="visit_rate" def get_cache_key(self, request, view): return self.get_ident(request)
settings.py设置:
REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': ['app01.utils.TokenAuth'], 'DEFAULT_PERMISSION_CLASSES': ['app01.utils.SVIPPermission'], 'DEFAULT_THROTTLE_CLASSES': ['app01.utils.VisitThrottle'], "DEFAULT_THROTTLE_RATES": { "visit_rate": "1/m", } }