python manage.py startapp threewall
配置settings.pyhtml
STATIC_URL = '/static/' STATICFILES_DIRS = ( os.path.join(BASE_DIR, 'static'), )
pip install -U channels==2.0.2 channels_redis==2.1.1
pip install pyCryptodome
pip install django-simpleui
INSTALLED_APPS = [ 'simpleui', 'import_export', 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'app', 'threewall', 'channels', ]
from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter import threewall.routing application = ProtocolTypeRouter({ # (http->django views is added by default) # 普通的HTTP请求不须要咱们手动在这里添加,框架会自动加载过来 'websocket': AuthMiddlewareStack( URLRouter( threewall.routing.websocket_urlpatterns ) ), })
紧接着,咱们须要在 Django 的配置文件中继续配置 Channels 的 asgi 应用和通道层的信息:前端
#WSGI_APPLICATION = 'wechatDemo.wsgi.application' ASGI_APPLICATION = "wechatDemo.routing.application" # 上面新建的 asgi 应用 CHANNEL_LAYERS = { 'default': { # 这里用到了 channels_redis 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { 'hosts': [('127.0.0.1', 6379)], # 配置你本身的 redis 服务信息 }, } }
windows 安装redis地址:https://www.jianshu.com/p/e16d23e358c0python
from django.contrib.auth.models import User from django.db import models from django.utils.crypto import random # Create your models here. from django.utils.html import format_html # Create your models here. def rename(newname): def decorator(fn): fn.__name__ = newname return fn return decorator # 签到表 class checkin(models.Model): headimgurl = models.URLField(max_length=256, default="", null=True, blank=True) openid = models.CharField(max_length=225, verbose_name="openid", blank=True, default="") nickname = models.CharField(max_length=225, verbose_name="昵称", blank=True, default="") sex = models.CharField(max_length=225, verbose_name="性别", blank=True, default="") language = models.CharField(max_length=225, verbose_name="语言", blank=True, default="") city = models.CharField(max_length=225, verbose_name="城市", blank=True, default="") createTime = models.DateTimeField(auto_now_add=True, verbose_name="签到时间") lastTime = models.DateTimeField(auto_now=True, verbose_name="修改时间") class Meta: verbose_name_plural = "签到表" @rename("模板头像") def showheadimgurl(self): return format_html("<img src='{}' style='width:50px'/>", self.headimgurl) def __str__(self): return self.nickname
from django.contrib import admin from threewall import models # Register your models here. @admin.register(models.checkin) class orderAdmin(admin.ModelAdmin): list_display = ("showheadimgurl", "openid", "nickname", "sex", "language", "city", "createTime", "lastTime") list_display_links = ("openid", "nickname") search_fields = ('nickname', "openid") list_per_page = 50
from django.shortcuts import render from wechatpy.oauth import WeChatOAuth from django.shortcuts import render, redirect from django.http import JsonResponse, HttpResponse, HttpResponseRedirect import time import datetime from django.conf import settings from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.shortcuts import render import uuid from wechatpy import WeChatClient import os import json from wechatpy import WeChatPay from threewall import models from wechatpy.pay import dict_to_xml import base64 from Crypto.Cipher import AES # Create your views here. # 公众号id AppID = "xxx" # 公众号AppSecret AppSecret = "xxx" # 密钥 key = "xxxx" # 服务号 client = WeChatClient(AppID, AppSecret) # 消息通道 from channels.layers import get_channel_layer channel_layer = get_channel_layer() from asgiref.sync import async_to_sync # Create your views here. def dwall(request): checkins = models.checkin.objects.values("headimgurl")[0:200] print(checkins.query) checkUserInfo = [] CurPersonNum = checkins.count() for item in checkins: checkUserInfo.append({"headimgurl": item["headimgurl"]}) if checkUserInfo.__len__() < 199: index = 0 while index < (199 - checkUserInfo.__len__()): index += 1 checkUserInfo.append({"headimgurl": "/static/3dwall/img/a.png"}) aes = AES.new(add_to_16(key), AES.MODE_ECB) # 先进行aes加密 ticks = str(time.time()) encrypt_aes = aes.encrypt(add_to_16(ticks)) # 用base64转成字符串形式 encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8') # 执行加密并转码返回bytes return render(request, "3dwall.html", {"checkUserInfo": checkUserInfo, "CurPersonNum": CurPersonNum, "encrypted_text": encrypted_text}) # 定义受权装饰器 def getWeChatOAuth(redirect_url): return WeChatOAuth(AppID, AppSecret, redirect_url, 'snsapi_userinfo') def oauth(method): def warpper(request): if request.session.get('user_info', None) is None: code = request.GET.get('code', None) wechat_oauth = getWeChatOAuth(request.get_raw_uri()) url = wechat_oauth.authorize_url print(url) if code: try: wechat_oauth.fetch_access_token(code) user_info = wechat_oauth.get_user_info() print(user_info) except Exception as e: print(str(e)) # 这里须要处理请求里包含的 code 无效的状况 # abort(403) else: # 建议存储在用户表 request.session['user_info'] = user_info else: return redirect(url) return method(request) return warpper @oauth def checkin(request): signature = request.GET.get("signature", None) if signature is None: return render(request, "checkerror.html") try: aes = AES.new(add_to_16(key), AES.MODE_ECB) # 优先逆向解密base64成bytes base64_decrypted = base64.decodebytes(signature.replace(' ', '+').encode(encoding='utf-8')) # 执行解密密并转码返回str decrypted_text = str(aes.decrypt(base64_decrypted), encoding='utf-8').replace('\0', '') except Exception as e: print("signature="+signature) return render(request, "expired.html") # 二维码已过时 print(decrypted_text) ltime = time.localtime(float(decrypted_text)) qrTime = time.strftime("%Y-%m-%d %H:%M:%S", ltime) # 没有必要再次转换为时间格式 # 得到系统当前时间-10秒,用于判断二维码是否过时 d = datetime.datetime.now() + datetime.timedelta(seconds=-10) t = d.timetuple() timeStamp = int(time.mktime(t)) timeStamp = float(str(timeStamp) + str("%06d" % d.microsecond)) / 1000000 if float(decrypted_text) < timeStamp: return render(request, "expired.html") # 二维码已过时 print(qrTime) user_info = request.session.get('user_info') user_info = client.user.get(user_info["openid"]) print(user_info) subscribe = user_info["subscribe"] if subscribe == 0: return render(request, "isfollow.html") headimgurl = user_info["headimgurl"] nickname = user_info["nickname"] ischeck = models.checkin.objects.filter(openid=user_info["openid"]).first() if ischeck is None: models.checkin.objects.create(headimgurl=headimgurl, openid=user_info["openid"], nickname=nickname, sex=user_info["sex"], language=user_info["language"], city=user_info["country"] + "-" + user_info["province"] + "-" + user_info["city"]) else: ischeck.save() async_to_sync(channel_layer.group_send)("chat_roomName", {'type': 'chat_message', 'message': {"type": "message", "headimgurl": headimgurl, "nickname": nickname}}) return render(request, "checkin.html") # pip install pyCryptodome # str不是16的倍数那就补足为16的倍数 def add_to_16(value): while len(value) % 16 != 0: value += '\0' return str.encode(value) # 返回bytes @csrf_exempt def signatureCheck(request): while True: time.sleep(5) # 初始化加密器 aes = AES.new(add_to_16(key), AES.MODE_ECB) # 先进行aes加密 ticks = str(time.time()) encrypt_aes = aes.encrypt(add_to_16(ticks)) # 用base64转成字符串形式 encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8') # 执行加密并转码返回bytes #print(encrypted_text) async_to_sync(channel_layer.group_send)("chat_roomName", {'type': 'chat_message', 'message': {"type": "signatureCheck", "signatureCheck": encrypted_text}})
import json,uuid import datetime import time from asgiref.sync import async_to_sync import multiprocessing from channels.generic.websocket import WebsocketConsumer class ChatConsumer(WebsocketConsumer): def connect(self): # 当 websocket 一连接上之后触发该函数 self.room_name = self.scope['url_route']['kwargs']['room_name'] self.room_group_name = 'chat_%s' % self.room_name print(self.room_group_name) # 注意 `group_add` 只支持异步调用,因此这里须要使用`async_to_sync`转换为同步调用 async_to_sync(self.channel_layer.group_add)( self.room_group_name, self.channel_name ) # 接受该连接 self.accept() def disconnect(self, close_code): # 断开连接是触发该函数 # 将该连接移出聊天室 async_to_sync(self.channel_layer.group_discard)( self.room_group_name, self.channel_name ) def receive(self, text_data): # 前端发送来消息时,经过这个接口传递 text_data_json = json.loads(text_data) message = text_data_json['message'] async_to_sync(self.channel_layer.group_send)( self.room_group_name, { # 这里的type要在当前类中实现一个相应的函数, # 下划线或者'.'的都会被Channels转换为下划线处理, # 因此这里写 'chat.message'也没问题 'type': 'chat_message', 'message': message + str(datetime.datetime.today()) } ) # 从聊天室拿到消息,后直接将消息返回回去 def chat_message(self, event): message = event['message'] # Send message to WebSocket self.send(text_data=json.dumps({ 'message': message }))
from django.urls import path from threewall import consumers websocket_urlpatterns = [ # 路由,指定 websocket 连接对应的 consumer path('ws/chat/<str:room_name>/', consumers.ChatConsumer), ]
from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter import threewall.routing application = ProtocolTypeRouter({ # (http->django views is added by default) # 普通的HTTP请求不须要咱们手动在这里添加,框架会自动加载过来 'websocket': AuthMiddlewareStack( URLRouter( threewall.routing.websocket_urlpatterns ) ), })
注:须要静态资源,请私下联系web