对查询出来的数据进行筛选可写可不写javascript
from rest_framework.filters import BaseFilterBackend 源码 ''' def filter_queryset(self, request, queryset, view): #继承了这个类必须写这个方法否则报错 raise NotImplementedError(".filter_queryset() must be overridden.") ''' view是当前视图self queryset是筛选以后的数据 request 就是request def filter_queryset(self,request,queryset,view): pass
对查询出来的数据进行筛选可写可不写 #第一部分 from rest_framework.filters import BaseFilterBackend #继承类 class MyFilterBack(BaseFilterBackend): def filter_queryset(self,request,queryset,view): val = request.query_params.get('cagetory') return queryset.filter(category_id=val) #先实例化一个 class Indexview(APIview): def get(self,request,*arg,**kwargs): #就是查询一个表不理他 queryset=models.News.objects.all() #实例化一个类对象 obj=MyFilterBack() #传值顺序request,queryset,self result=obj.filter_queryset(request,queryset,self) return Response('...')
def get(self,request,*args,**kwargs): # 1.获取单挑数据再作序列化 result = super().get(request,*args,**kwargs) # 2.对浏览器进行自加1 pk = kwargs.get('pk') models.Article.objects.filter(pk=pk).update(read_count=F('read_count')+1) # 3.对浏览器进行自加1 # instance = self.get_object() # instance.read_count += 1 # instance.save() return result
提供了公共方法 request.data 是request.post的封装 json序列化 dispatch()分发 self封装了request属性 self.request.method
@classmethod def as_view(cls, **initkwargs): if isinstance(getattr(cls, 'queryset', None), models.query.QuerySet): def force_evaluation(): raise RuntimeError( 'Do not evaluate the `.queryset` attribute directly, ' 'as the result will be cached and reused between requests. ' 'Use `.all()` or call `.get_queryset()` instead.' ) cls.queryset._fetch_all = force_evaluation #执行父类的as_view方法 view = super().as_view(**initkwargs) #执行父类的super().as_view(**initkwargs) view.cls = cls view.initkwargs = initkwargs #闭包 csrf_exempt免除csrftoken的认证 return csrf_exempt(view) -------------------------------------- @classonlymethod def as_view(cls, **initkwargs): """ Main entry point for a request-response process. """ for key in initkwargs: if key in cls.http_method_names: raise TypeError("You tried to pass in the %s method name as a " "keyword argument to %s(). Don't do that." % (key, cls.__name__)) if not hasattr(cls, key): raise TypeError("%s() received an invalid keyword %r. as_view " "only accepts arguments that are already " "attributes of the class." % (cls.__name__, key)) def view(request, *args, **kwargs): self = cls(**initkwargs) if hasattr(self, 'get') and not hasattr(self, 'head'): self.head = self.get self.request = request self.args = args self.kwargs = kwargs #执行本类的self.dispatch return self.dispatch(request, *args,**kwargs) #执行view return view -------------apiview的dispatch-------- def dispatch(self, request, *args, **kwargs): """ `.dispatch()` is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling. """ self.args = args self.kwargs = kwargs #封装老值request request = self.initialize_request(request, *args, **kwargs) self.request = request self.headers = self.default_response_headers # deprecate? try: self.initial(request, *args, **kwargs) # Get the appropriate handler method if request.method.lower() in self.http_method_names: handler = getattr(self, request.method.lower(), self.http_method_not_allowed) else: handler = self.http_method_not_allowed response = handler(request, *args, **kwargs) except Exception as exc: response = self.handle_exception(exc) self.response = self.finalize_response(request, response, *args, **kwargs) return self.response
#一 初识 from rest_framework.generics import GenericAPIView GenericAPIview继承了APIview class NewView(GenericAPIview): querset=model.News.objects.all() def get(self,request,*arg,**kwargs): #self对应NewView的对象 self.filter_queryset(self.queryset) 1.执行对应的filter_queryset方法 2.本类没有去寻找父类GenericAPIview ''' def filter_queryset(self, queryset): for backend in list(self.filter_backends): queryset = backend().filter_queryset(self.request, queryset, self) return queryset '''#因此默认返回原来的queryset 3.寻找filter_backends ''' filter_backends = api_settings.DEFAULT_FILTER_BACKENDS filter_backends是空的 '''
from rest_framework.generics import GenericAPIView #GenericAPIview继承了APIview class NewFiltrBackend(BaseFilterBackend): def filter_queryset(self,request,queryset,view): val = request.query_params.get('cagetory') return queryset.filter(category_id=val) class NewView(GenericAPIview): querset=model.News.objects.all() filter_backends=[NewFiltrBackend ,] def get(self,request,*arg,**kwargs): #self对应NewView的对象 v=self.get_queryset() queryset=self.filter_queryset(v) 1.执行对应的filter_queryset方法 2.本类没有去寻找父类GenericAPIview ''' 源码 继承类不写self.queryset会报错 assert self.queryset is not None, ( "'%s' should either include a `queryset` attribute, " "or override the `get_queryset()` method." % self.__class__.__name__ ) ''' ''' def filter_queryset(self, queryset): for backend in list(self.filter_backends): queryset = backend().filter_queryset(self.request, queryset, self) return queryset '''4.backend等于对应NewFiltrBackend类名() 实例化对象 ·· 执行NewFiltrBackend里面的filter_queryset方法 3.寻找filter_backends,本类的filter_backends #filter_backends=[NewFiltrBackend ,] 5.queryset=self.filter_queryset(self.queryset)是筛选以后的结果 #v=self.get_queryset() queryset=self.filter_queryset(v) 6.寻找对应get_queryset ''' 源码 queryset = self.queryset if isinstance(queryset, QuerySet): # Ensure queryset is re-evaluated on each request. queryset = queryset.all() return queryset '''#返回等于筛选以后的queryset #queryset=self.filter_queryset(queryset) #queryset=get_queryset() ·········self.get_serializer()··············· class Newserializers(serializers.ModelSerializer): class Meta: model=models.News fields="__all__" class NewFiltrBackend(BaseFilterBackend): def filter_queryset(self,request,queryset,view): val = request.query_params.get('cagetory') return queryset.filter(category_id=val) class NewView(GenericAPIview): querset=model.News.objects.all() filter_backends=[NewFiltrBackend ,] def get(self,request,*arg,**kwargs): #self对应NewView的对象 v=self.get_queryset() queryset=self.filter_queryset(v) self.get_serializer()#本类没有去父类找 #代替了 ser=Newserializers(instance=queryset,many=True) #ser.data 1.寻找get_serializer() 2. ''' 源码 def get_serializer(self, *args, **kwargs): serializer_class = self.get_serializer_class() kwargs['context'] = self.get_serializer_context() return serializer_class(*args, **kwargs) ''' 3.进行寻找get_serializer_class()本类没有去父类找 #serializer_class = self.get_serializer_class() 4. ''' def get_serializer_class(self): assert self.serializer_class is not None, ( "'%s' should either include a `serializer_class` attribute, " "or override the `get_serializer_class()` method." % self.__class__.__name__ ) return self.serializer_class ''' #return self.serializer_class 返回serializer_class #get_serializer()=serializer_class 在本类定义seializer_class seializer_class=Newserializers 至关于 #ser=Newserializers(instance=queryset,many=True) #ser.data ser=self.get_serializer(instance=queryset,many=True) ser.data `````````````````````````````` 分页 querset=model.News.objects.all() pagination_class =PageNumberPagination self.paginate_queryset(queryset) ''' 源码 self.paginate_queryset() self._paginator = self.pagination_class() 须要定义 def paginator(self): if not hasattr(self, '_paginator'): if self.pagination_class is None: self._paginator = None else: self._paginator = self.pagination_class() return self._paginator '''pagination_class()须要本地定义 pagination_class =PageNumberPagination
源码的剖析 def get_queryset(self): """ Get the list of items for this view. This must be an iterable, and may be a queryset. Defaults to using `self.queryset`. This method should always be used rather than accessing `self.queryset` directly, as `self.queryset` gets evaluated only once, and those results are cached for all subsequent requests. You may want to override this if you need to provide different querysets depending on the incoming request. (Eg. return a list of items that is specific to the user) """ assert self.queryset is not None, ( "'%s' should either include a `queryset` attribute, " "or override the `get_queryset()` method." % self.__class__.__name__ ) queryset = self.queryset if isinstance(queryset, QuerySet): # Ensure queryset is re-evaluated on each request. queryset = queryset.all() return queryset def get_object(self): """ Returns the object the view is displaying. You may want to override this if you need to provide non-standard queryset lookups. Eg if objects are referenced using multiple keyword arguments in the url conf. """ queryset = self.filter_queryset(self.get_queryset()) # Perform the lookup filtering. lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field assert lookup_url_kwarg in self.kwargs, ( 'Expected view %s to be called with a URL keyword argument ' 'named "%s". Fix your URL conf, or set the `.lookup_field` ' 'attribute on the view correctly.' % (self.__class__.__name__, lookup_url_kwarg) ) filter_kwargs = {self.lookup_field: self.kwargs[lookup_url_kwarg]} obj = get_object_or_404(queryset, **filter_kwargs) # May raise a permission denied self.check_object_permissions(self.request, obj) return obj def get_serializer(self, *args, **kwargs): """ Return the serializer instance that should be used for validating and deserializing input, and for serializing output. """ serializer_class = self.get_serializer_class() kwargs['context'] = self.get_serializer_context() return serializer_class(*args, **kwargs) def get_serializer_class(self): """ Return the class to use for the serializer. Defaults to using `self.serializer_class`. You may want to override this if you need to provide different serializations depending on the incoming request. (Eg. admins get full serialization, others get basic serialization) """ assert self.serializer_class is not None, ( "'%s' should either include a `serializer_class` attribute, " "or override the `get_serializer_class()` method." % self.__class__.__name__ ) return self.serializer_class def get_serializer_context(self): """ Extra context provided to the serializer class. """ return { 'request': self.request, 'format': self.format_kwarg, 'view': self } def filter_queryset(self, queryset): """ Given a queryset, filter it with whichever filter backend is in use. You are unlikely to want to override this method, although you may need to call it either from a list view, or from a custom `get_object` method if you want to apply the configured filtering backend to the default queryset. """ for backend in list(self.filter_backends): queryset = backend().filter_queryset(self.request, queryset, self) return queryset
from rest_framework.generics import GenericAPIView from rest_framework.filters import BaseFilterBackend from rest_framework import serializers GenericAPIview继承了APIview class Newserializers(serializers.ModelSerializer): class Meta: model=models.News fields="__all__" class NewFiltrBackend(BaseFilterBackend): def filter_queryset(self,request,queryset,view): val = request.query_params.get('cagetory') return queryset.filter(category_id=val) class NewView(GenericAPIview): querset=model.News.objects.all() filter_backends=[NewFiltrBackend ,] seializer_class=Newserializers def get(self,request,*arg,**kwargs): #self对应NewView的对象 v=self.get_queryset() queryset=self.filter_queryset(v) self.get_serializer(instance==queryset,many=True) retutn Response(ser.data) ''' 1.querset=model.News.objects.all() 2.filter_backends=[NewFiltrBackend ,] 3.seializer_class=Newserializers 4.pagination_class =PageNumberPagination 1.查询 self.get_queryset()#等于querset=model.News.objects.all() 2.序列化 self.get_serializer()等于seializer_class=Newserializers() 3.筛选 self.filter_queryset() 等于filter_backends=[NewFiltrBackend ,] 内部有一个for循环列表 同等与NewFiltrBackend() 4.分页 self.paginate_queryset(queryset) 等于page_obj=PageNumberPagination() '''
基于GenericAPIViewcss
基于GenericAPIViewhtml
1.queryset 2.分页 #settings的配置 paginator.paginate_queryset(queryset, self.request, view=self) "DEFAULT_PAGINATION_CLASS":"rest_framework.pagination.PageNumberPagination", "PAGE_SIZE":2, 3.序列化 4.返回序列化数据
#源码 def get(self, request, *args, **kwargs): return self.list(request, *args, **kwargs) 1.执行list def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) #self.get_queryset()执行自定的 #queryset=model.News.objects.all() #self.filter_queryset()筛选执行 #filter_backends=[NewFiltrBackend ,] #执行对应本类的filter_queryset 2. page = self.paginate_queryset(queryset) #分页 #执行 pagination_class =PageNumberPagination if page is not None: 3.serializer = self.get_serializer(page, many=True) #序列化 执行seializer_class=Newserializers return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) #最后返回序列化的数据
模拟get请求前端
1. #对应得到对象 本身要定义单条数据 queryset 2. #序列化 3. #返回数据
class RetrieveAPIView(mixins.RetrieveModelMixin, GenericAPIView): #从左往右继承 def get(self, request, *args, **kwargs): return self.retrieve(request, *args, **kwargs) #去找retrieve这个方法 class RetrieveModelMixin: """ Retrieve a model instance. """ def retrieve(self, request, *args, **kwargs): #对应得到对象 instance = self.get_object() #序列化 serializer = self.get_serializer(instance) #返回数据 return Response(serializer.data) 执行 def get_object(self): """ Returns the object the view is displaying. You may want to override this if you need to provide non-standard queryset lookups. Eg if objects are referenced using multiple keyword arguments in the url conf. """ queryset = self.filter_queryset(self.get_queryset()) # Perform the lookup filtering. lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field assert lookup_url_kwarg in self.kwargs, ( 'Expected view %s to be called with a URL keyword argument ' 'named "%s". Fix your URL conf, or set the `.lookup_field` ' 'attribute on the view correctly.' % (self.__class__.__name__, lookup_url_kwarg) ) return obj
基于GenericAPIViewjava
基于GenericAPIViewpython
封装了post请求jquery
1.序列化 2.序列化验证 3.保存数据
def post(self, request, *args, **kwargs): return self.create(request, *args, **kwargs) 1.执行create def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) #序列化 serializer.is_valid(raise_exception=True) #序列化验证 self.perform_create(serializer) ''' 保存数据 def perform_create(self, serializer): serializer.save() ''' headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
#第一种写法重写create书写 多条序列化器 def create(self, request, *args, **kwargs): article = articleseralizer(data=request.data) articleDetail = atricleDetaliser(data=request.data) if article.is_valid() and articleDetail.is_valid(): article_obj = article.save(author=request.user) print(">>>>>>>>>", request.user, type(request.user)) articleDetail.save(article=article_obj) return Response('修改为功') else: return Response(f"{article.errors},{articleDetail.errors}") #第二种方式 重写序列化器分发 def get_serializer_class(self): self.request.method=="POST": return 对应序列化器 self.request.method=="GET": return 对应序列化器 def perform_create(self,serializer): Aritlel=serializer.save(author) serializer_second=AricleSerializer(data=request.data) serializer.save(aritle=Aritlel) 字段对象=对象
封装了局部更新和全局更新web
基于GenericAPIViewajax
1.获取更新那条数据的对象 2.序列化 2.序列化校验 3.校验成功更新
def put(self, request, *args, **kwargs): return self.update(request, *args, **kwargs) ''' def update(self, request, *args, **kwargs): partial = kwargs.pop('partial', False) instance = self.get_object() ''' #执行queryset查询处理的model对象 #至关于 #queryset=model.表名.object.filter(pk=pk).frist() ''' serializer = self.get_serializer(instance, data=request.data, partial=partial)默认不为Ture #执行对应的序列化 serializer.is_valid(raise_exception=True) #序列化验证 self.perform_update(serializer) ''' # def perform_update(self, serializer): #serializer.save() def patch(self, request, *args, **kwargs): return self.partial_update(request, *args, **kwargs) ''' def partial_update(self, request, *args, **kwargs): kwargs['partial'] = True return self.update(request, *args, **kwargs) ''' 执行 ''' def update(self, request, *args, **kwargs): partial = kwargs.pop('partial', False) instance = self.get_object() ''' #执行queryset查询处理的model对象 #至关于 #queryset=model.表名.object.filter(pk=pk).frist() ''' serializer = self.get_serializer(instance, data=request.data, partial=partial) kwargs['partial'] = True 局部更新 #执行对应的序列化 serializer.is_valid(raise_exception=True) #序列化验证 self.perform_update(serializer) ''' # def perform_update(self, serializer): #serializer.save()
1.传入对象 2.执行def perform_destroy(self, instance): 3.删除
def delete(self, request, *args, **kwargs): return self.destroy(request, *args, **kwargs) def destroy(self, request, *args, **kwargs): instance = self.get_object() #c传入对象删除 self.perform_destroy(instance) return Response(status=status.HTTP_204_NO_CONTENT) def perform_destroy(self, instance): instance.delete()#删除
是APIview视图类的继承对象算法
createapiview UpdateAPIview。。。。等等
1.直接继承便可 2.由于没有封装对应get post等请求方法 一点点去找父类的方法 3.GenericViewSet(ViewSetMixin)它继承的ViewSetMixin 4.重写了asview()须要传参 5.使用两个类进行id和没有id的区分 url(r'^article/$',article.AtricleView.as_view({"get":"list"})) url(r'^article/(?P<pk>\d+)/$',article.AtricleView.as_view({"get":"retrieve"}))
from rest_framework.viewsets import GenericVIewSet class GenericViewSet(ViewSetMixin,generics.GenericAPIView ): 默认继承GenericAPIView def get_serializer_class(self): pk = self.kwargs.get('pk') if pk: return ArticleDetailSerializer return ArticleSerializer
def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) page = self.paginate_queryset(queryset) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response(serializer.data)
def retrieve(self, request, *args, **kwargs): instance = self.get_object() serializer = self.get_serializer(instance) return Response(serializer.data)
def update(self, request, *args, **kwargs): partial = kwargs.pop('partial', False) instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) self.perform_update(serializer) if getattr(instance, '_prefetched_objects_cache', None): # If 'prefetch_related' has been applied to a queryset, we need to # forcibly invalidate the prefetch cache on the instance. instance._prefetched_objects_cache = {} return Response(serializer.data) def perform_update(self, serializer): serializer.save() # def partial_update(self, request, *args, **kwargs): kwargs['partial'] = True return self.update(request, *args, **kwargs) class
删除
class DestroyModelMixin: """ Destroy a model instance. """ def destroy(self, request, *args, **kwargs): instance = self.get_object() self.perform_destroy(instance) return Response(status=status.HTTP_204_NO_CONTENT) def perform_destroy(self, instance): instance.delete()
setting配置
default_version = api_settings.DEFAULT_VERSION #默认版本 allowed_versions = api_settings.ALLOWED_VERSIONS #指定版本 version_param = api_settings.VERSION_PARAM #url版本传输关键字 #指定版本类类 "DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning",
dispath 老的request 封装了不少功能 request.version#版本 scheme#版本对象 #执行返回一个版本元组 第一个是版本第二个是版本对象 def determine_version(self, request, *args, **kwargs): """ If versioning is being used, then determine any API version for the incoming request. Returns a two-tuple of (version, versioning_scheme) """ if self.versioning_class is None: return (None, None) scheme = self.versioning_class() return (scheme.determine_version(request, *args, **kwargs), scheme) request.version, request.versioning_scheme = version, scheme
class APIView(View): versioning_class = api_settings.DEFAULT_VERSIONING_CLASS def dispatch(self, request, *args, **kwargs): # ###################### 第一步 ########################### """ request,是django的request,它的内部有:request.GET/request.POST/request.method args,kwargs是在路由中匹配到的参数,如: url(r'^order/(\d+)/(?P<version>\w+)/$', views.OrderView.as_view()), http://www.xxx.com/order/1/v2/ """ self.args = args self.kwargs = kwargs """ request = 生成了一个新的request对象,此对象的内部封装了一些值。 request = Request(request) - 内部封装了 _request = 老的request """ request = self.initialize_request(request, *args, **kwargs) self.request = request self.headers = self.default_response_headers # deprecate? try: # ###################### 第二步 ########################### self.initial(request, *args, **kwargs) 执行视图函数。。 def initial(self, request, *args, **kwargs): # ############### 2.1 处理drf的版本 ############## version, scheme = self.determine_version(request, *args, **kwargs) request.version, request.versioning_scheme = version, scheme ... def determine_version(self, request, *args, **kwargs): if self.versioning_class is None: return (None, None) #是版本类的实例化对象 scheme = self.versioning_class() # obj = XXXXXXXXXXXX() return (scheme.determine_version(request, *args, **kwargs), scheme) class OrderView(APIView): versioning_class = URLPathVersioning def get(self,request,*args,**kwargs): print(request.version) print(request.versioning_scheme) return Response('...') def post(self,request,*args,**kwargs): return Response('post')
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.request import Request from rest_framework.versioning import URLPathVersioning class OrderView(APIView): versioning_class = URLPathVersioning def get(self,request,*args,**kwargs): print(request.version) print(request.versioning_scheme) return Response('...') def post(self,request,*args,**kwargs): return Response('post')
REST_FRAMEWORK = { "ALLOWED_VERSIONS":['v1','v2'], 'VERSION_PARAM':'version' }
不用手动一个个加所有都配置上
url(r'^(?P<version>[v1|v2]+)/users/$', users_list, name='users-list'), url(r'^(?P<version>\w+)/users/$', users_list, name='users-list'),
settings配置
REST_FRAMEWORK = { 指定版本类 必需要写要些路径 "DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning", #限制版本范围 "ALLOWED_VERSIONS":['v1','v2'], #指定url有名分组的关键字 'VERSION_PARAM':'version' }
自定义认证token 把数据库的user拿出来比较 若是想使用**request.data 就要把他变为字典 因此前端要返回json数据类型 源码写法 #配置 authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
和登录配合使用 import uuid from django.shortcuts import render from django.views import View from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from rest_framework.versioning import URLPathVersioning from rest_framework.views import APIView from rest_framework.response import Response class Loginview(APIView): versioning_class = None #认证写法 authentication_classes = [Myauthentication, ] def post(self,request,*args,**kwargs): user_object = models.UserInfo.objects.filter(**request.data).first() if not user_object: return Response('登陆失败') random_string = str(uuid.uuid4()) user_object.token = random_string user_object.save() return Response(random_string) class MyAuthentication: def authenticate(self, request): """ Authenticate the request and return a two-tuple of (user, token). """ token = request.query_params.get('token') user_object = models.UserInfo.objects.filter(token=token).first() if user_object: return (user_object,token) return (None,None) class OrderView(APIView): authentication_classes = [MyAuthentication, ] def get(self,request,*args,**kwargs): print(request.user) print(request.auth) return Response('order') class UserView(APIView): authentication_classes = [MyAuthentication,] def get(self,request,*args,**kwargs): print(request.user) print(request.auth) return Response('user') ···············手写认证类···························· class Myauthentication: # 认证 # versioning_class = None #手写认证 def authenticate(self, request): token = request.query_params.get('token') print(token) user_object = models.UserInfo.objects.filter(token=token).first() print(user_object) if user_object: print(1) #必须返回一个元组的形式 #返回对应的user_object和token值 return (user_object, token) else: print(2) return (None, None) ---------------------- 为何要返回元组 ---------------------- #为何要返回元组 def _authenticate(self): """ Attempt to authenticate the request using each authentication instance in turn. """ for authenticator in self.authenticators: try: user_auth_tuple = #到这返回为何能够执行对应的方法 authenticator.authenticate(self) except exceptions.APIException: self._not_authenticated() raise if user_auth_tuple is not None: #把查询出来的元组 self._authenticator = authenticator #给user和auth进行赋值 self.user, self.auth = user_auth_tuple#必须返回一个元组的形式 return#结束函数 self._not_authenticated() def _not_authenticated(self): """ Set authenticator, user & authtoken representing an unauthenticated request. Defaults are None, AnonymousUser & None. """ self._authenticator = None if api_settings.UNAUTHENTICATED_USER: #user 没有设置返回none self.user = #匿名用户 api_settings.UNAUTHENTICATED_USER() else: self.user = None if api_settings.UNAUTHENTICATED_TOKEN: self.auth = #对应author没有设置返回none api_settings.UNAUTHENTICATED_TOKEN() else: self.auth = None
DEFAULT_AUTHENTICATION_CLASSES=['写一个认证的类手写的'] class Myauthentication: # 认证 # versioning_class = None #手写认证 def authenticate(self, request): token = request.query_params.get('token') print(token) user_object = models.UserInfo.objects.filter(token=token).first() print(user_object) if user_object: print(1) #必须返回一个元组的形式 #返回对应的user_object和token值 return (user_object, token) else: print(2) return (None, None) ''' # raise Exception(), 不在继续往下执行,直接返回给用户。 # return None ,本次认证完成,执行下一个认证 # return ('x',"x"),认证成功,不须要再继续执行其余认证了,继续日后权限、节流、视图函数 '''
class LoginView(APIView): authentication_classes = [] def post(self,request,*args,**kwargs): user_object = models.UserInfo.objects.filter(**request.data).first() if not user_object: return Response('登陆失败') random_string = str(uuid.uuid4()) user_object.token = random_string user_object.save() return Response(random_string) class OrderView(APIView): # authentication_classes = [TokenAuthentication, ] def get(self,request,*args,**kwargs): print(request.user) print(request.auth) if request.user: return Response('order') return Response('滚') class UserView(APIView): 同上
''' 当请求发过来的实话会先执行 apiview里面的dispath方法 1.执行认证的dispatch的方法 2.dispatch方法会执行initialize_request方法封装旧的request对象 3.在封装的过程当中会执行 get_authenticators(self): 并实例化列表中的每个认证类 若是本身写的类有对应的认证类 会把认证类的实例化对象封装到新的request当中 继续执行initial 会执行认证里面的perform_authentication 执行request.user 会执行循环每个对象执行对应的authenticate方法 会有三种返回值 # raise Exception(), 不在继续往下执行,直接返回给用户。 # return None ,本次认证完成,执行下一个认证 # return ('x',"x"),认证成功,不须要再继续执行其余认证了,继续日后权限、节流、视图函数 里面会执行四件事 版本 认证 权限 节流 ''' def dispatch(self, request, *args, **kwargs): """ `.dispatch()` is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling. """ self.args = args self.kwargs = kwargs #封装旧的request request = self.initialize_request(request, *args, **kwargs) self.request = request self.headers = self.default_response_headers # deprecate? 循环自定义的认证 #request封装旧的request执行initialize_request def initialize_request(self, request, *args, **kwargs): """ Returns the initial request object. """ parser_context = self.get_parser_context(request) return Request( request, parsers=self.get_parsers(), authenticators=self.get_authenticators(), negotiator=self.get_content_negotiator(), parser_context=parser_context ) def get_authenticators(self): """ Instantiates and returns the list of authenticators that this view can use. """ return [auth() for auth in self.authentication_classes] def initial(self, request, *args, **kwargs): """ Runs anything that needs to occur prior to calling the method handler. """ self.format_kwarg = self.get_format_suffix(**kwargs) # Perform content negotiation and store the accepted info on the request neg = self.perform_content_negotiation(request) request.accepted_renderer, request.accepted_media_type = neg # Determine the API version, if versioning is in use. version, scheme = self.determine_version(request, *args, **kwargs) request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted self.perform_authentication(request) self.check_permissions(request) self.check_throttles(request) #执行完四件事在执行视图
message=消息 自定义错误消息 能够自定义错误信息 ''' for permission in self.get_permissions(): if not permission.has_permission(request, self): self.permission_denied( request, message=getattr(permission, 'message', None) ) ''' 本质上自定义和 return Flase 单条验证 验证两次 不写url_kwarg默认封装识别pk关键字
message={"code":"恭喜你报错了"}
from rest_framework.permissions import BasePermission from rest_framework import exceptions class MyPermission(BasePermission): #第一种 message = {'code': 10001, 'error': '你没权限'} def has_permission(self, request, view): #request.user执行认证 if request.user: return True #自定义错误信息 raise exceptions.PermissionDenied({'code': 10001, 'error': '你没权限'}) return False def has_object_permission(self, request, view, obj): """ Return `True` if permission is granted, `False` otherwise. """ return False #对应源码 至关于重写了错误信息 #执行顺序 dispatch---》intital--》check_permissions--》permission_denied def permission_denied(self, request, message=None): """ If request is not permitted, determine what kind of exception to raise. """ if request.authenticators and not request.successful_authenticator: raise exceptions.NotAuthenticated() raise #等于message 或者自定义错误函数 #raise exceptions.PermissionDenied({'code': 10001, 'error': '你没权限'}) exceptions.PermissionDenied(detail=message) ················源码·················· def check_permissions(self, request): for permission in self.get_permissions(): if not permission.has_permission(request, self): self.permission_denied( request, message=getattr(permission, 'message', None) )
apiview --- dispatch --- initial --- 执行权限判断 若是本身写了权限类 会先循环权限类并实例化存在列表当中 而后循环列表对象,若是权限判断返回不为Ture机会进行主动抛出异常 能够自定义错误信息
class APIView(View): permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES def dispatch(self, request, *args, **kwargs): 封装request对象 self.initial(request, *args, **kwargs) 经过反射执行视图中的方法 def initial(self, request, *args, **kwargs): 版本的处理 # 认证 self.perform_authentication(request) # 权限判断 self.check_permissions(request) self.check_throttles(request) #执行认证 def perform_authentication(self, request): request.user def check_permissions(self, request): # [对象,对象,] self.执行 ''' def get_permissions(self): return [permission() for permission in self.permission_classes] ''' for permission in self.get_permissions(): if not permission.has_permission(request, self): self.permission_denied(request, message=getattr(permission, 'message', None)) def permission_denied(self, request, message=None): if request.authenticators and not request.successful_authenticator: raise exceptions.NotAuthenticated() raise exceptions.PermissionDenied(detail=message) # class UserView(APIView): permission_classes = [MyPermission, ] def get(self,request,*args**kwargs): return Response('user')
class MyPermission(BasePermission): message = {'code': 10001, 'error': '你没权限'} def has_permission(self, request, view): #request.user执行认证 if request.user: return True raise exceptions.PermissionDenied({'code': 10001, 'error': '你没权限'}) return False def has_object_permission(self, request, view, obj): """ Return `True` if permission is granted, `False` otherwise. """ return False class UserView(APIView): 先执行上面的数据 permission_classes = [MyPermission, ] #再执行 def get(self,request,*args**kwargs): return Response('user')
def dispatch(self, request, *args, **kwargs): """ `.dispatch()` is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling. """ self.args = args self.kwargs = kwargs request = self.initialize_request(request, *args, **kwargs) self.request = request self.headers = self.default_response_headers # deprecate? try: #走这 self.initial(request, *args, **kwargs) `````````````````def initial`````````````````````````` def initial(self, request, *args, **kwargs): """ Runs anything that needs to occur prior to calling the method handler. """ self.format_kwarg = self.get_format_suffix(**kwargs) # Perform content negotiation and store the accepted info on the request neg = self.perform_content_negotiation(request) request.accepted_renderer, request.accepted_media_type = neg # Determine the API version, if versioning is in use. version, scheme = ''' 第三步 ''' self.determine_version(request, *args, **kwargs) request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted self.perform_authentication(request) self.check_permissions(request) self.check_throttles(request) --------------------------------------- # 第二步 def check_permissions(self, request): # [对象,对象,] self.执行 ''' def get_permissions(self): return [permission() for permission in self.permission_classes] ''' for permission in self.get_permissions(): if not permission.has_permission(request, self): self.permission_denied(request, message=getattr(permission, 'message', None)) def permission_denied(self, request, message=None): if request.authenticators and not request.successful_authenticator: raise exceptions.NotAuthenticated() raise exceptions.PermissionDenied(detail=message)
def determine_version(self, request, *args, **kwargs): if self.versioning_class is None: return (None, None) scheme = self.versioning_class() return (scheme.determine_version(request, *args, **kwargs), scheme)
单挑数据
def has_object_permission(self, request, view, obj): """ Return `True` if permission is granted, `False` otherwise. """ return False #在执行RetrieveAPIView 会执行单挑数据的验证 def get_object(self): queryset = self.filter_queryset(self.get_queryset()) # Perform the lookup filtering. #不写url_kwarg默认封装识别pk关键字 lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field assert lookup_url_kwarg in self.kwargs, ( 'Expected view %s to be called with a URL keyword argument ' 'named "%s". Fix your URL conf, or set the `.lookup_field` ' 'attribute on the view correctly.' % (self.__class__.__name__, lookup_url_kwarg) ) filter_kwargs = {self.lookup_field: self.kwargs[lookup_url_kwarg]} obj = get_object_or_404(queryset, **filter_kwargs) # May raise a permission denied #检验单挑数据的权限 self.check_object_permissions(self.request, obj) return obj ···················check_object_permissions·········· def check_object_permissions(self, request, obj): """ Check if the request should be permitted for a given object. Raises an appropriate exception if the request is not permitted. """ for permission in self.get_permissions(): if not permission.has_object_permission(request, self, obj): self.permission_denied( request, message=getattr(permission, 'message', None) ) 执行has_object_permission def has_object_permission(self, request, view, obj): return True
1.首先全部方法都执行父类的apiview方法 2.执行dispatch方法 3.进行 def get_authenticators(self): return [auth() for auth in self.authentication_classes] 4. intiail request.user
from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator def csrf_exempt(view_func): """ Marks a view function as being exempt from the CSRF view protection. """ # We could just do view_func.csrf_exempt = True, but decorators # are nicer if they don't have side-effects, so we return a new # function. def wrapped_view(*args, **kwargs): return view_func(*args, **kwargs) wrapped_view.csrf_exempt = True return wraps(view_func, assigned=available_attrs(view_func))(wrapped_view)
- 匿名用户,用IP做为用户惟一标记,但若是用户换代理IP,没法作到真正的限制。 - 登陆用户,用用户名或用户ID作标识。能够作到真正的限制
当前这个字典存放在django的缓存中 { throttle_anon_1.1.1.1:[100121340,], 1.1.1.2:[100121251,100120450,] } 限制:60s能访问3次 来访问时: 1.获取当前时间 100121280 2.100121280-60 = 100121220,小于100121220全部记录删除所有剔除 3.判断1分钟之内已经访问多少次了? 4 判断访问次数 4.没法访问 停一会 来访问时: 1.获取当前时间 100121340 2.100121340-60 = 100121280,小于100121280全部记录删除 3.判断1分钟之内已经访问多少次了? 0 4.能够访问
在视图类中配置 throttle_classes= [] 这是一个列表 实现 allow_request 方法 wait 是一个提示方法 返回 True/False。 True 能够继续访问, False 表示限制 全局配置 DEFAULT_THROTTLE_CLASSE,节流限制类 DEFAULT_THROTTLE_RATES 表示频率限制,好比 10/m 表示 每分钟 10 次 源码在 initial 中实现 check_throttles 方法 在进行节流以前已经作了版本认证权限 #dispatch分发 def initial(self, request, *args, **kwargs): #版本 version, scheme = self.determine_version(request, *args, **kwargs) request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted #认证 self.perform_authentication(request) #权限 self.check_permissions(request) #频率限制 self.check_throttles(request) 1.先找allow_request本类没有去父类找 同dispatch 2.执行对应的rate获取对应scope 节流的频率 2.key是获取ip值 3.history是获取ip地址 获取对应的访问时间记录 ''' self.history = self.cache.get(self.key, []) ''' 4.对应的访问记录等于ture获取到值, 把当前的时间减去一个本身设定的时间戳 ''' while self.history and self.history[-1] <= self.now - self.duration: #条件知足 把最后一个值删除 #再去循环判断 self.history.pop() ''' 5.而后判断访问次数有没有大于本身设定的值 ''' if len(self.history) >= self.num_requests: 知足条件走这 return self.throttle_failure() 不知足条件走这 return self.throttle_success() ''' 6.成功 def throttle_success(self): """ Inserts the current request's timestamp along with the key into the cache. """ self.history.insert(0, self.now) self.cache.set(self.key, self.history, self.duration) return True 6.失败 def throttle_failure(self): """ Called when a request to the API has failed due to throttling. """ return False#不能够访问 7若是返回false进入等待时间 def wait(self): """ Returns the recommended next request time in seconds. """ if self.history: remaining_duration = self.duration - (self.now - self.history[-1]) else: remaining_duration = self.duration available_requests = self.num_requests - len(self.history) + 1 if available_requests <= 0: return None return remaining_duration / float(available_requests)
#dispatch分发 def initial(self, request, *args, **kwargs): #版本 version, scheme = self.determine_version(request, *args, **kwargs) request.version, request.versioning_scheme = version, scheme # Ensure that the incoming request is permitted #认证 self.perform_authentication(request) #权限 self.check_permissions(request) #频率限制 self.check_throttles(request) def check_throttles(self, request): throttle_durations = [] for throttle in self.get_throttles(): #找到当前类的allow_request if not throttle.allow_request(request, self): throttle_durations.append(throttle.wait()) if throttle_durations: durations = [ duration for duration in throttle_durations if duration is not None ] duration = max(durations, default=None) self.throttled(request, duration) #执行类的allow_request def allow_request(self, request, view): """ Implement the check to see if the request should be throttled. On success calls `throttle_success`. On failure calls `throttle_failure`. """ if self.rate is None: return True 获取 ''' def __init__(self): if not getattr(self, 'rate', None): self.rate = self.get_rate() self.num_requests, self.duration = self.parse_rate(self.rate) 当前这个类有一个获取rate的方法 ''' #执行get_rate 读取settings设置的配置文件 ''' def get_rate(self): #若是不设置会进行报错 try: #设置了就进行键取值 return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for '%s' scope" % self.scope raise ImproperlyConfigured(msg) ''' #经过键去取值/进行分割获取值 ''' def parse_rate(self, rate): if rate is None: return (None, None) num, period = rate.split('/') num_requests = int(num) duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]] return (num_requests, duration) ''' #这步继续往下执行继承类的get_cache_key self.key = self.get_cache_key(request, view) #匿名 #继承了class AnonRateThrottle(SimpleRateThrottle): ''' scope = 'anon' def get_cache_key(self, request, view): if request.user.is_authenticated: return None # Only throttle unauthenticated requests. return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } } #返回字符串格式化 throttle_(匿名)anon_(ip地址的拼接)1.1.1.1:[100121340,], ''' #根据用户进行判断 重写get_cache_key ''' ''' if self.key is None: return True self.history = self.cache.get(self.key, []) self.now = self.timer() # Drop any requests from the history which have now passed the # throttle duration while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() #元组返回的值num_requests if len(self.history) >= self.num_requests: #不能够访问 return self.throttle_failure() #能够访问 return self.throttle_success() ''' 6.成功 def throttle_success(self): #成功把访问的当前时间插入history self.history.insert(0, self.now) self.cache.set(self.key, self.history, self.duration) return True 6.失败 def throttle_failure(self): """ Called when a request to the API has failed due to throttling. """ return False#不能够访问 '''
REST_FRAMEWORK = { throttle_classes = [AnonRateThrottle,] "DEFAULT_THROTTLE_RATES":{"anon":"3/m"} #这样写的缘由 源码 #经过匿名 scope = 'anon' def get_cache_key(self, request, view): if request.user.is_authenticated: return None # Only throttle unauthenticated requests. return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } } #获取全局设置的步骤 def __init__(self): if not getattr(self, 'rate', None): #第一步 self.rate = self.get_rate() ''' 以设置的键进行取值获取时间 def get_rate(self): 若是不设置就进行报错 try: return self.THROTTLE_RATES[self.scope] except KeyError: msg = "No default throttle rate set for '%s' scope" % self.scope raise ImproperlyConfigured(msg) ''' self.num_requests, self.duration = self.parse_rate(self.rate) #后面设置值格式 #例 "DEFAULT_THROTTLE_RATES":{"anon":"3/m"} 以/分割 前面是限制的次数 后面的是访问限制的时间 ''' def parse_rate(self, rate): if rate is None: return (None, None) num, period = rate.split('/') num_requests = int(num) duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]] return (num_requests, duration) ''' 以后执行对应字典的取值 key=throttle_(匿名)anon_(ip地址的拼接)1.1.1.1: [100121340,],#值 self.key = self.get_cache_key(request, view) if self.key is None: return True self.history = self.cache.get(self.key, []) self.now = self.timer() # Drop any requests from the history which have now passed the # throttle duration while self.history and self.history[-1] <= self.now - self.duration: self.history.pop() if len(self.history) >= self.num_requests: return self.throttle_failure() return self.throttle_success()
REST_FRAMEWORK = { 'DEFAULT_THROTTLE_CLASSES': [ 'api.utils.throttles.throttles.LuffyAnonRateThrottle', 'api.utils.throttles.throttles.LuffyUserRateThrottle', ], 'DEFAULT_THROTTLE_RATES': { #不重写的默认走这 'anon': '10/day', 'user': '10/day', 'luffy_anon': '10/m', 'luffy_user': '20/m', }, } settings
from django.conf.urls import url, include from web.views.s3_throttling import TestView urlpatterns = [ url(r'^test/', TestView.as_view()), ] urls.py
REST_FRAMEWORK = { 'UNAUTHENTICATED_USER': None, 'UNAUTHENTICATED_TOKEN': None, 'DEFAULT_THROTTLE_RATES': { 'luffy_anon': '10/m', 'luffy_user': '20/m', }, } settings.py
根据匿名ip或者user 进行判断
#!/usr/bin/env python # -*- coding:utf-8 -*- from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.throttling import SimpleRateThrottle class LuffyAnonRateThrottle(SimpleRateThrottle): """ 匿名用户,根据IP进行限制 """ scope = "luffy_anon" def get_cache_key(self, request, view): # 用户已登陆,则跳过 匿名频率限制 if request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class LuffyUserRateThrottle(SimpleRateThrottle): """ 登陆用户,根据用户token限制 """ #重写scope scope = "luffy_user" def get_ident(self, request): """ 认证成功时:request.user是用户对象;request.auth是token对象 :param request: :return: """ # return request.auth.token return "user_token" def get_cache_key(self, request, view): """ 获取缓存key :param request: :param view: :return: """ # 未登陆用户,则跳过 Token限制 if not request.user: return None return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } class TestView(APIView): throttle_classes = [LuffyUserRateThrottle, LuffyAnonRateThrottle, ] def get(self, request, *args, **kwargs): # self.dispatch print(request.user) print(request.auth) return Response('GET请求,响应内容') def post(self, request, *args, **kwargs): return Response('POST请求,响应内容') def put(self, request, *args, **kwargs): return Response('PUT请求,响应内容') ``````````````````````````````````````````` #源码 class AnonRateThrottle(SimpleRateThrottle): scope = 'anon' def get_cache_key(self, request, view): if request.user.is_authenticated: return None # Only throttle unauthenticated requests. return self.cache_format % { 'scope': self.scope, 'ident': self.get_ident(request) } # def get_cache_key(self, request, view): return self.get_ident(request)#获取对应的ip值
# settings.py 'DEFAULT_THROTTLE_RATES': { 'Vistor': '3/m', 'User': '10/m' },
from django.views.decorators.csrf import csrf_exempt from django.shortcuts import HttpResponse @csrf_exempt def index(request): return HttpResponse('...') # index = csrf_exempt(index) urlpatterns = [ url(r'^index/$',index), ]
urlpatterns = [ url(r'^login/$',account.LoginView.as_view()), ] class APIView(View): @classmethod def as_view(cls, **initkwargs): view = super().as_view(**initkwargs) view.cls = cls view.initkwargs = initkwargs # Note: session based authentication is explicitly CSRF validated, # all other authentication is CSRF exempt. return csrf_exempt(view)
第一种:原始APIView
url(r'^login/$',account.LoginView.as_view()),
from rest_framework.views import APIView from rest_framework.response import Response from rest_framework_jwt.settings import api_settings from rest_framework.throttling import AnonRateThrottle from api import models class LoginView(APIView): authentication_classes = [] def post(self,request,*args,**kwargs): # 1.根据用户名和密码检测用户是否能够登陆 user = models.UserInfo.objects.filter(username=request.data.get('username'),password=request.data.get('password')).first() if not user: return Response({'code':10001,'error':'用户名或密码错误'}) # 2. 根据user对象生成payload(中间值的数据) jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER payload = jwt_payload_handler(user) # 3. 构造前面数据,base64加密;中间数据base64加密;前两段拼接而后作hs256加密(加盐),再作base64加密。生成token jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER token = jwt_encode_handler(payload) return Response({'code': 10000, 'data': token})
第二种:ListApiView等
url(r'^article/$',article.ArticleView.as_view()), url(r'^article/(?P<pk>\d+)/$',article.ArticleDetailView.as_view()),
from rest_framework.throttling import AnonRateThrottle from rest_framework.response import Response from rest_framework.generics import ListAPIView,RetrieveAPIView from api import models from api.serializer.article import ArticleSerializer,ArticleDetailSerializer class ArticleView(ListAPIView): authentication_classes = [] # throttle_classes = [AnonRateThrottle,] queryset = models.Article.objects.all() serializer_class = ArticleSerializer class ArticleDetailView(RetrieveAPIView): authentication_classes = [] queryset = models.Article.objects.all() serializer_class = ArticleDetailSerializer
第三种:
url(r'^article/$',article.ArticleView.as_view({"get":'list','post':'create'})), url(r'^article/(?P<pk>\d+)/$',article.ArticleView.as_view({'get':'retrieve','put':'update','patch':'partial_update','delete':'destroy'}))
from rest_framework.viewsets import GenericViewSet from rest_framework.mixins import ListModelMixin,RetrieveModelMixin,CreateModelMixin,UpdateModelMixin,DestroyModelMixin from api.serializer.article import ArticleSerializer,ArticleDetailSerializer class ArticleView(GenericViewSet,ListModelMixin,RetrieveModelMixin,CreateModelMixin,UpdateModelMixin,DestroyModelMixin): authentication_classes = [] throttle_classes = [AnonRateThrottle,] queryset = models.Article.objects.all() serializer_class = None def get_serializer_class(self): pk = self.kwargs.get('pk') if pk: return ArticleDetailSerializer return ArticleSerializer
装饰器
def outer(func): def inner(*args,**kwargs): return func(*args,**kwargs) return inner @outer def index(a1): pass index()
def outer(func): def inner(*args,**kwargs): return func(*args,**kwargs) return inner def index(a1): pass index = outer(index) index()
django中能够免除csrftoken认证
from django.views.decorators.csrf import csrf_exempt from django.shortcuts import HttpResponse @csrf_exempt def index(request): return HttpResponse('...') # index = csrf_exempt(index) urlpatterns = [ url(r'^index/$',index), ]
urlpatterns = [ url(r'^login/$',account.LoginView.as_view()), ] class APIView(View): @classmethod def as_view(cls, **initkwargs): view = super().as_view(**initkwargs) view.cls = cls view.initkwargs = initkwargs # Note: session based authentication is explicitly CSRF validated, # all other authentication is CSRF exempt. return csrf_exempt(view)
面向对象中基于继承+异常处理来作的约束
class BaseVersioning: def determine_version(self, request, *args, **kwargs): raise NotImplementedError("must be implemented") class URLPathVersioning(BaseVersioning): def determine_version(self, request, *args, **kwargs): version = kwargs.get(self.version_param, self.default_version) if version is None: version = self.default_version if not self.is_allowed_version(version): raise exceptions.NotFound(self.invalid_version_message) return version
面向对象封装
class Foo(object): def __init__(self,name,age): self.name = name self.age = age obj = Foo('汪洋',18)
class APIView(View): def dispatch(self, request, *args, **kwargs): self.args = args self.kwargs = kwargs request = self.initialize_request(request, *args, **kwargs) self.request = request ... def initialize_request(self, request, *args, **kwargs): """ Returns the initial request object. """ parser_context = self.get_parser_context(request) return Request( request, parsers=self.get_parsers(), authenticators=self.get_authenticators(), # [MyAuthentication(),] negotiator=self.get_content_negotiator(), parser_context=parser_context )
面向对象继承
class View(object): pass class APIView(View): def dispatch(self): method = getattr(self,'get') method() class GenericAPIView(APIView): serilizer_class = None def get_seriliser_class(self): return self.serilizer_class class ListModelMixin(object): def get(self): ser_class = self.get_seriliser_class() print(ser_class) class ListAPIView(ListModelMixin,GenericAPIView): pass class UserInfoView(ListAPIView): pass view = UserInfoView() view.dispatch()
class View(object): pass class APIView(View): def dispatch(self): method = getattr(self,'get') method() class GenericAPIView(APIView): serilizer_class = None def get_seriliser_class(self): return self.serilizer_class class ListModelMixin(object): def get(self): ser_class = self.get_seriliser_class() print(ser_class) class ListAPIView(ListModelMixin,GenericAPIView): pass class UserInfoView(ListAPIView): serilizer_class = "汪洋" view = UserInfoView() view.dispatch()
class View(object): pass class APIView(View): def dispatch(self): method = getattr(self,'get') method() class GenericAPIView(APIView): serilizer_class = None def get_seriliser_class(self): return self.serilizer_class class ListModelMixin(object): def get(self): ser_class = self.get_seriliser_class() print(ser_class) class ListAPIView(ListModelMixin,GenericAPIView): pass class UserInfoView(ListAPIView): def get_seriliser_class(self): return "咩咩" view = UserInfoView() view.dispatch()
反射
class View(object): def dispatch(self, request, *args, **kwargs): # Try to dispatch to the right method; if a method doesn't exist, # defer to the error handler. Also defer to the error handler if the # request method isn't on the approved list. if request.method.lower() in self.http_method_names: handler = getattr(self, request.method.lower(), self.http_method_not_allowed) else: handler = self.http_method_not_allowed return handler(request, *args, **kwargs)
发送ajax请求
$.ajax({ url:'地址', type:'GET', data:{...}, success:function(arg){ console.log(arg); } })
浏览器具备 "同源策略的限制",致使 发送ajax请求
+ 跨域
存在没法获取数据。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>常鑫的网站</h1> <p> <input type="button" value="点我" onclick="sendMsg()"> </p> <p> <input type="button" value="点他" onclick="sendRemoteMsg()"> </p> <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script> <script> function sendMsg() { $.ajax({ url:'/msg/', type:'GET', success:function (arg) { console.log(arg); } }) } function sendRemoteMsg() { $.ajax({ url:'http://127.0.0.1:8002/json/', type:'GET', success:function (arg) { console.log(arg); } }) } </script> </body> </html>
如何解决ajax+跨域?
CORS,跨站资源共享,本质:设置响应头。
常见的Http请求方法
get post put patch delete options
http请求中Content-type请起头
状况一: content-type:x-www-form-urlencode name=alex&age=19&xx=10 request.POST和request.body中均有值。 状况二: content-type:application/json {"name":"ALex","Age":19} request.POST没值 request.body有值。
django中F查询
django中获取空Queryset
models.User.object.all().none()
基于django的fbv和cbv都能实现遵循restful规范的接口
def user(request): if request.metho == 'GET': pass class UserView(View): def get()... def post...
基于django rest framework框架实现restful api的开发。
- 免除csrf认证 - 视图(APIView、ListAPIView、ListModelMinx) - 版本 - 认证 - 权限 - 节流 - 解析器 - 筛选器 - 分页 - 序列化 - 渲染器
简述drf中认证流程?
1.用户发来请求优先执行dispatch方法 2.内部会封装reqeustd1
简述drf中节流的实现原理以及过程?匿名用户/非匿名用户 如何实现频率限制?
GenericAPIView视图类的做用?
他提供了一些规则,例如: class GenericAPIView(APIView): serializer_class = None queryset = None lookup_field = 'pk' filter_backends = api_settings.DEFAULT_FILTER_BACKENDS pagination_class = api_settings.DEFAULT_PAGINATION_CLASS def get_queryset(self): return self.queryset def get_serializer_class(self): return self.serializer_class def filter_queryset(self, queryset): for backend in list(self.filter_backends): queryset = backend().filter_queryset(self.request, queryset, self) return queryset @property def paginator(self): if not hasattr(self, '_paginator'): if self.pagination_class is None: self._paginator = None else: self._paginator = self.pagination_class() return self._paginator 他至关于提供了一些规则,建议子类中使用固定的方式获取数据,例如: class ArticleView(GenericAPIView): queryset = models.User.objects.all() def get(self,request,*args,**kwargs): query = self.get_queryset() 咱们能够本身继承GenericAPIView来实现具体操做,可是通常不会,由于更加麻烦。 而GenericAPIView主要是提供给drf内部的 ListAPIView、Create.... class ListModelMixin: def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) page = self.paginate_queryset(queryset) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) class ListAPIView(mixins.ListModelMixin,GenericAPIView): def get(self, request, *args, **kwargs): return self.list(request, *args, **kwargs) class MyView(ListAPIView): queryset = xxxx ser...
总结:GenericAPIView主要为drf内部帮助咱们提供增删改查的类LIstAPIView、CreateAPIView、UpdateAPIView、提供了执行流程和功能,咱们在使用drf内置类作CURD时,就能够经过自定义 静态字段(类变量)或重写方法(get_queryset、get_serializer_class)来进行更高级的定制。
jwt以及其优点。
jwt先后端分离 用于用户认证 jwt的实现原理: -用户登录成功,会给前端返回一个tokon值。 此token值只在前端保存 token值分为 一段类型和算法信息 第二段用户信息和超时时间 第三段前两段数据拼接以后进行has256再次加密+base64url
序列化时many=True和many=False的区别?
应用DRF中的功能进行项目开发
***** 解析器:request.query_parmas/request.data 视图 序列化 渲染器:Response **** request对象封装 版本处理 分页处理 *** 认证 权限 节流
因为浏览器具备同源策略的限制 对ajax请求的限制 javascript的src对应src不进行限制 同源同端口 不一样源就是跨域 写了/api 不跨域 api访问django api.xx.com跨域了访问django
发一次请求
设置响应头就能够解决 from django.shortcuts import render,HttpResponse def json(request): response = HttpResponse("JSONasdfasdf") response['Access-Control-Allow-Origin'] = "*" return response
预检option
请求
写法
from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator @csrf_exempt def put_json(request): response = HttpResponse("JSON复杂请求") if request.method == 'OPTIONS': # 处理预检 response['Access-Control-Allow-Origin'] = "*" response['Access-Control-Allow-Methods'] = "PUT" return response elif request.method == "PUT": return response
本质在数据返回值设置响应头 from django.shortcuts import render,HttpResponse def json(request): response = HttpResponse("JSONasdfasdf") response['Access-Control-Allow-Origin'] = "*" return response
<!DOCTYPE html> <html> <head lang="en"> <meta charset="UTF-8"> <title></title> </head> <body> <p> <input type="button" onclick="Jsonp1();" value='提交'/> </p> <p> <input type="button" onclick="Jsonp2();" value='提交'/> </p> <script type="text/javascript" src="jquery-1.12.4.js"></script> <script> function Jsonp1(){ var tag = document.createElement('script'); tag.src = "http://c2.com:8000/test/"; document.head.appendChild(tag); document.head.removeChild(tag); } function Jsonp2(){ $.ajax({ url: "http://c2.com:8000/test/", type: 'GET', dataType: 'JSONP', success: function(data, statusText, xmlHttpRequest){ console.log(data); } }) } </script> </body> </html>
条件: 一、请求方式:HEAD、GET、POST 二、请求头信息: Accept Accept-Language Content-Language Last-Event-ID Content-Type 对应的值是如下三个中的任意一个 application/x-www-form-urlencoded multipart/form-data text/plain 注意:同时知足以上两个条件时,则是简单请求,不然为复杂请求
部署 collectstaic 收集静态文件
用于在先后端分离时,实现用户登陆相关。
用户登陆成功以后,生成一个随机字符串,给前端。 - 生成随机字符串 加密信息 #{typ:"jwt","alg":'HS256'} # 加密手段segments.append(base64url_encode(json_header)) 98qow39df0lj980945lkdjflo. #第二部分的信息 {id:1,username:'alx','exp':10} #加密手段segments.append(base64url_encode(payload)) saueoja8979284sdfsdf. #两个密文拼接加盐 asiuokjd978928374 - 类型信息经过base64加密 - 数据经过base64加密 - 两个密文拼接在h256加密+加盐 - 给前端返回token值只在前端 token是由。分割的三段组成 - 第一段:类型和算法信息 - 第二段。 用户的信息和超时时间 - 第三段:hs256(前两段拼接)加密 + base64url - 之后前端再次发来信息时 - 超时验证 - token合法性校验 前端获取随机字符串以后,保留起来。 之后再来发送请求时,携带98qow39df0lj980945lkdjflo.saueoja8979284sdfsdf.asiuokjd978928375。 后端接收到以后 1.取出第二部分进行时间的判断 2. 把前面两个进行加密对第三个值进行加密 - token只在前端保存,后端只负责校验。 - 内部集成了超时时间,后端能够根据时间进行校验是否超时。 - 因为内部存在hash256加密,因此用户不能够修改token,只要一修改就认证失败。 通常在先后端分离时,用于作用户认证(登陆)使用的技术。 jwt的实现原理: - 用户登陆成功以后,会给前端返回一段token。 - token是由.分割的三段组成。 - 第一段:类型和算法信心 - 第二段:用户信息+超时时间 - 第三段:hs256(前两段拼接)加密 + base64url - 之后前端再次发来信息时 - 超时验证 - token合法性校验 优点: - token只在前端保存,后端只负责校验。 - 内部集成了超时时间,后端能够根据时间进行校验是否超时。 - 因为内部存在hash256加密,因此用户不能够修改token,只要一修改就认证失败。
用户登录成功以后,生成一个随机字符串,本身保留一份,给前端返回一份。 之后前端再来发请求时,须要携带字符串 后端对字符串进行校验
pip3 install djangorestframework-jwt
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'api.apps.ApiConfig', 'rest_framework', 'rest_framework_jwt']
from rest_framework_jwt.views import obtain_jwt_token
用户信息加密 jwt_payload_hander = api_settings.JWT_PAYLOAD_HANDLER 第三段信息的加密 jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER #解密 jwt_decode_handler = api_settings.JWT_DECODE_HANDLER def jwt_payload_handler(user): username_field = get_username_field() username = get_username(user) warnings.warn( 'The following fields will be removed in the future: ' '`email` and `user_id`. ', DeprecationWarning ) payload = { 'user_id': user.pk, 'username': username, 'exp': datetime.utcnow() + #默认5分钟 api_settings.JWT_EXPIRATION_DELTA #若是有email会把email配置上 if hasattr(user, 'email'): payload['email'] = user.email if isinstance(user.pk, uuid.UUID): #若是有user_id会把pk值配置上 payload['user_id'] = str(user.pk) payload[username_field] = username #源码 settings= 'JWT_EXPIRATION_DELTA':datetime.timedelta(seconds=300) , }
api_settings.JWT_ENCODE_HANDLER --> 加密的具体实现 def jwt_encode_handler(payload): #payload传入 key = api_settings.JWT_PRIVATE_KEY or jwt_get_secret_key(payload) #加盐 #SECRET_KEY = '+gr4bbq8e$yqbd%n_h)2(osz=bmk1x2+o6+w5g@a4r1#3%q1n*' return jwt.encode( payload,#类型信息头信息 key,#加盐 #默认封装传入了hs256 api_settings.JWT_ALGORITHM # 'JWT_ALGORITHM': 'HS256', ).decode('utf-8') encode默认继承父类的 父类的encode方法 # Header header = {'typ': self.header_typ, 'alg': algorithm} #self.header_typ #-- 》 header_typ = 'JWT' signing_input = b'.'.join(segments)#把类型信息和数据用.的形式拼接到了一块儿 try: alg_obj = self._algorithms[algorithm]#执行算法 key = alg_obj.prepare_key(key) signature = alg_obj.sign(signing_input, key)#把拼接起来的值进行二次加密 成为第三个信息 ''' except KeyError: if not has_crypto and algorithm in requires_cryptography: raise NotImplementedError( "Algorithm '%s' could not be found. Do you have cryptography " "installed?" % algorithm ) else: raise NotImplementedError('Algorithm not supported') ''' segments.append(base64url_encode(signature))#再把第三个信息放入列表 对第三个信息进行base64进行加密 return b'.'.join(segments)#用.的形式再把第三个数据拼接起来 进行返回 98qow39df0lj980945lkdjflo.saueoja8979284sdfsdf.asiuokjd978928375
将token分割成 header_segment、payload_segment、crypto_segment 三部分 对第一部分header_segment进行base64url解密,获得header 对第二部分payload_segment进行base64url解密,获得payload 对第三部分crypto_segment进行base64url解密,获得signature 对第三部分signature部分数据进行合法性校验 拼接前两段密文,即:signing_input 从第一段明文中获取加密算法,默认:HS256 使用 算法+盐 对signing_input 进行加密,将获得的结果和signature密文进行比较。
通常在先后端分离时,用于作用户认证(登陆)使用的技术。 jwt的实现原理: - 用户登陆成功以后,会给前端返回一段token。 - token是由.分割的三段组成。 - 第一段:类型和算法信心 - 第二段:用户信息+超时时间 - 第三段:hs256(前两段拼接)加密 + base64url - 之后前端再次发来信息时 - 超时验证 - token合法性校验 #优点: - token只在前端保存,后端只负责校验。 - 内部集成了超时时间,后端能够根据时间进行校验是否超时。 - 因为内部存在hash256加密,因此用户不能够修改token,只要一修改就认证失败。
import uuid from rest_framework.views import APIView from rest_framework.response import Response from rest_framework.versioning import URLPathVersioning from rest_framework import status from api import models class LoginView(APIView): """ 登陆接口 """ def post(self,request,*args,**kwargs): # 基于jwt的认证 # 1.去数据库获取用户信息 from rest_framework_jwt.settings import api_settings #头信息 jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER #第三个数据的加密 jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER user = models.UserInfo.objects.filter(**request.data).first() if not user: return Response({'code':1000,'error':'用户名或密码错误'}) ''' 'user_id': user.pk, 'username': username, 'exp': datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA ''' #头信息的处理 payload = jwt_payload_handler(user) #对三段信息的编码 加密 token = jwt_encode_handler(payload) return Response({'code':1001,'data':token}) #第二种方式 class LoginView(APIView): authentication_classes = [] def post(self,request,*args,**kwargs): # 1.根据用户名和密码检测用户是否能够登陆 user = models.UserInfo.objects.filter(username=request.data.get('username'),password=request.data.get('password')).first() if not user: return Response({'code':10001,'error':'用户名或密码错误'}) # 2. 根据user对象生成payload(中间值的数据) jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER payload = jwt_payload_handler(user) # 3. 构造前面数据,base64加密;中间数据base64加密;前两段拼接而后作hs256加密(加盐),再作base64加密。生成token jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER token = jwt_encode_handler(payload) return Response({'code': 10000, 'data': token})
开始解码
from rest_framework.views import APIView from rest_framework.response import Response # from rest_framework.throttling import AnonRateThrottle,BaseThrottle class ArticleView(APIView): # throttle_classes = [AnonRateThrottle,] def get(self,request,*args,**kwargs): # 获取用户提交的token,进行一步一步校验 import jwt from rest_framework import exceptions from rest_framework_jwt.settings import api_settings #进行解码 jwt_decode_handler = api_settings.JWT_DECODE_HANDLER #获取到加密以后的字符串 jwt_value = request.query_params.get('token') try: #对token进行解密 payload = jwt_decode_handler(jwt_value) #判断签名是否过时 except jwt.ExpiredSignature: msg = '签名已过时' raise exceptions.AuthenticationFailed(msg) #判断是否被篡改 except jwt.DecodeError: msg = '认证失败' raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() print(payload) return Response('文章列表')
'JWT_EXPIRATION_DELTA':datetime.timedelta(seconds=300) #默认五分钟有效 #自定义 import datetime JWT_AUTH = { "JWT_EXPIRATION_DELTA":datetime.timedelta(minutes=10) }
ListModelMixin
执行list方法