目录python
管理员 1 注册 2 登陆 3 上传视频 4 删除视频 5 发布公告 用户 1 注册 2 登陆 3 冲会员 4 查看视频 5 下载免费视频 6 下载收费视频 7 查看观影记录 8 查看公告
层级结构:客户端 服务端 数据库 客户端: 基于tcp链接的套接字程序 管理员视图 注册、登陆、上传视频、删除视频、发布公告 用户视图 注册、登陆、购买vip、查看视频、下载免费视频、下载收费视频、查看下载记录、查看公告 服务端: tcpserver:基于多线程实现并发的套接字通讯 解决粘包问题 interface:admin_interface、user_interface、common_interface models类和ORM框架:models类中的四张表继承ORM框架中的基类model 数据库: 建立四张表:user、movie、notice、download_record
# 优势:让一个不懂数据库操做的小白也可以简单快速操做数据库实现相应功能 # 缺点:sql封装固定,不利于sql查询优化 # 对象关系映射 # 类 >>> 数据库的表 # 对象 >>> 表的一条条的记录 # 对象获取属性或方法 >>> 记录的字段对应的值 # 一张表有字段,字段又有字段名,字段类型,字段是不是主键,字段的默认值 class Field(object): pass # 为了在定义的时候更加方便 经过继承Field定义具体的字段类型 class StringField(Field): pass class IntegerField(Field): pass class Models(dict): pass def __getattr__(self,item): return self.get(item) def __setattr__(self,key,value) self[key] = value # 查询 def select(self,**kwargs): # select * from userinfo # select * from userinfo where id = 1 # 新增 def save(self): # insert into userinfo(name,password) values('jason','123') # 修改:是基于已经存在了的数据进行修改操做 def update(self): # update userinfo set name='jason',password='234' where id = 1 """ (******) hasattr getattr setattr """ # 元类拦截类的建立过程 使它具有表的特性 class ModelsMetaClass(type): def __new__(cls,class_name,class_bases,class_attrs): # 只拦截模型表的建立表 if class_name == 'Models': return type.__new__(cls,class_name,calss_bases,class_attrs) table_name = class_attrs.get('table_name',class_name) primary_key = None mappings = {} for k,v in class_attrs.items(): if isinstance(v,Field): mappings[k] = v if v.primary: if primary_key: raise TypeError('主键重复') primary_key = v.name for k in mappings.keys(): class_attrs.pop(k) if not primary_key: raise TypeError('必需要有一个主键') class_attrs['table_name'] = table_name class_attrs['primary_key'] = primary_key class_attrs['mappings'] = mappings return type.__new__(cls,class_name,calss_bases,class_attrs)
表结构定义好之后,数据怎么操做,逻辑代码怎么实现就清晰了 用户表:包含管理员和普通用户信息 user--->id ,name ,password ,is_locked ,is_vip , user_type,register_time 电影表:movie---->id,name,, path,is_free, is_delete,user_id,create_time,file_md5 公告表:notice--->id,title,content,create_time,user_id 下载记录表:download_record---->id,user_id,movie_id,create_time create table user( id int auto_increment primary key, name varchar(255), password varchar(255), is_locked int not null default 0, is_vip int not null default 0, user_type varchar(255), register_time varchar(255) )engine=Innodb charset='utf8'; create table movie( id int auto_increment primary key, name varchar(64), path varchar(255), is_free int not null default 1, is_delete int not null default 0, create_time varchar(255), file_md5 varchar(255), user_id int )engine=Innodb charset='utf8'; create table notice( id int auto_increment primary key, title varchar(255), content varchar(255), user_id int, create_time varchar(255) )engine=Innodb charset='utf8'; create table download_record( id int auto_increment primary key, user_id int, movie_id int, create_time varchar(255) )engine=Innodb charset='utf8';
管理员: 一、注册功能 客户端 1-一、选择每一个功能以前都须要都须要须要链接上服务器,即须要一个socket对象,每一个函数传一个client 1-二、密码在传递过程当中不能是明文吧,须要加密,可选择hashlib中md5加密,定义一个公共方法咯 1-三、定义一个发送和接收的公共方法、这里要注意的是在这个方法有一个关键字形参、用于传输文件,默认为None 1-四、考虑一个问题,发送的字典中要包含哪些数据、对于注册这个问题,包含服务器端用于标识的type的功能类型、 用户名、密码(要加密)、还有用户类型"user_type"(admin或者user)这里是admin类型 1-五、接收获得的字典back_dic又包含那些数据,常见的就flag和msg,后续的功能中有返回列表类型的 服务端 1-六、首先就是基于多线程实现并发的套接字程序,子线程working函数中会先接收到客户端发来的字典(用到json、struct模块) 1-七、有个问题是有什么便利的方法将接收到的字典recv_dic 和与客户端创建链接的socket对象conn 交给接口层中相应的功能进 行操做数据库,那就定义一个分发函数dispatch(recv_dic,conn),而后判断recv_dic["type]类型和全局func_dic字典中进行 比对,去执行与之对应的函数,若是传过来的类型不存在func_dic字典中,那就自定义一个字典back_dic(包含flag和msg数据) 调用服务端公共发送数据方法返回给客户端 1-八、我们不知不觉就来到了服务端注册接口了,意味着能够操做数据库啦,就须要用到ORM框架和db目录中models模块中与表一一对应 的类、这四个类都是根据事先在数据库中定义好的字段进行建立的,不要写错了,字段和类型。这四个类都继承了ORM框架的基类 modle,因此但是直接点就能够调用ORM框架中基类中方法,select方法是类方法,获得的是一个列表套对象,还有save方法,用于保存 ,还有一个update方法用于更新,那我们回过头来 1-九、注册功能拿到的recv_dic中能够拿到注册的用户名,获得用户名后使用user_data = models.User.select(name=name )进行判断要注册的 用户是否存在,若果存在老规矩back_dic(flag为False,msg为注册失败)返回去,不存在那咋整,还能咋整保存到数据库user表中呗,那 怎么保存呀,name,password,user_type,is_locked和is_vip都有默认值,register_time注册时间的话写个方法 time.strftime("%Y-%m-%d %X") 这样不就全搞定了,什么数据都拿到了,那就用models.User()把这些数据搞进去建立获得一个对象,对象调用save方法进行方法就ojbk了,不急还有 要记得通知客户端,老规矩back_dic字典,调用公共发送方法,注册大功告成 登陆 客户端 2-一、在注册功能该项目的整体框架都已经打通了任督二脉,个人乖乖,那登陆功能须要考虑一个问题,客户端若是登录成功,是否是须要标记一下登录状态 ,老规矩在全局定义一个字典,把返回的字典中一个session存到全局字典cookie中,解决了ojbk, 2-二、发送字典send_dic中type类型修改成login,密码的话照样发送密文,而后over了 服务器 2-三、还记得tcpserver模块中的全局func_dic字典吗?强大的地方来了,刚刚只是写了一个注册的映射接口,如今来了一个login类型,那咋整,就往里加一个 login的映射方法,还能够直接拿到recv_dic和conn,任督二脉打通了就是强,哦还有注册和登陆都是管理员和普通用户的公共方法,因此放到common_interface 中,其实放哪都同样只要能找到就行啦 哈哈 2-四、你要登录,逻辑点在哪里,首先我要判断你这货存不存在呀,不存在登录个屁呀,淡定淡定,哈哈,上面说过select方法获得的是列表,别给老子忘了,列表里面 放的是一个个对象,models中User类调用select方法根据name=recv_dic["name"]获得user_list,若是user_list存在,那就取零号位就拿到user_obj用户对象 2-五、拿到user_obj对象点表中的字段属性判断其类型和接收的recv_dic字典中类型和密码是否一致,一致的话即可以获得一个back_dic字典了,老规矩包含flag和msg 2-六、重点来了,这里可能有带你绕,请无关人员速速离开,要返回的back_dic字典中须要添加一个session添加到字典中,这个session是用户登录成功以后生成的一个 随机字符串,咱这里也是用hashlib,这里要保证生成的字符串是惟一的,这里须要加盐,加一个当前cpu执行代码的时间 time.clock() 2-七、,服务端怎么校验用户的登录问题,考虑两个问题,第一个问题服务端须要保存session,第二个问题当用户退出以后将该用户对应的键值删除? 那咱们如何判断用户走了,运行到哪一段代码就标记用户走了呢,咱们可不能够经过addr就能够定位哪个用户断开了,找到当前用对应的数据删除,数据保存形式 {‘addr’:[session,user_id]} 将这个东西存在哪里呢,能够放在全局,但咱们这里把他存到Tcpsever目录下user_data模块中live_user['addr’']=[session,user_id] 那问题又来,怎么拿到add,第一种思路给每个函数都添加addr参数,可是这个addr参数只是login函数用到,其余函数都没用到,这样第一种思路很不合理,第二种思路 能够经过working中接收到的recv_dic字典添加recv_dic["addr"] = str(addr) 再传给每个函数,在login函数中user_data.live_user[recv_dic["addr"]] = [session,user_obj.id] 有考虑一个问题,由于多线程要操做公共数据user_data中的live_user字典,就会出现数据错乱,因此要加锁,那这个锁在那里产生呢?咱们要在tcpsever全局中产生mutex = Lock() 在这里产生,可是不能在这里用,由于会出现循环导入问题,tcpserver导入common_interface,在common_interface中又用到tcpserver中的锁,相互导入就出现循环导入,解决办法, 将锁保存到user_data中 user_data.mutex = mutex,在login中给user_data.live_user[recv_dic["addr"]] = [session,user_obj.id]加锁,直接导入user_data就可使用到锁啦 还没完在tcpserver中 用户退出(try...except.(下面的执行的代码就表示其中一个线程断开)..)就要删除user_data.live_user.pop(str(addr)) ,这里也是公共方法须要 加锁user_data.mutex.acquire()和user_data.mutex.release() 2-八、下面的功能都须要先登陆才能操做,这里来个装饰器功能:校验客户端发过来的随机字符串,若是有这个随机字符串那就正常执行函数,若是没有返回请先登陆的提示,意味着客户端 发送的字典要带着session过来,装饰器inner(*args,**kwargs)中args=(recv_dic,conn) kwargs={} 拿到客户端发过来的随机字符串与服务器的数据进行比对 vlues=[session,user_id] for vlues in user_data.live_user.vlues(): if args[0].get("session") == v[0]:将对应的user_id放入recv_dic中,以便后续使用args[0]["user_id"]=vlues[1] break 以上for循环不必定能找到,for循环只是单单的判断session,而后将user_id放到接收字典recv_dic中,那被装饰的函数到底执不执行,if args[0].get("user_id"): func(*args,**kwargs) else: back_dic ={"flag"False,"msg":"请先登陆"} 而后调用返回函数send_back(back_dic,args[1]) 三、上传视频 客户端 3-一、查看有哪些影片须要上传的,即获取全部视频 3-二、判断影片是否存在才能上传,那应该怎么判断是个问题,咱们能不能对上传的视频文件进行hashlib,自定义被hash的数据能够在文件开头,1/3,2/3,末尾-10而后获得md5值 发送字典类型"check_movie",包含"session","file_md5",获得字典back_dic,若是视频不存在那要输入is_free,是否免费,而后在发字典send_dic,该字典类型为"upload_movie",还包含 "session"、"file_name"、 "file_size"、"file_md5",这里调用公共收发方法是要给文件file传参了,把上传文件路径传过去 服务端 3-三、还记得tcpserver模块中的全局func_dic字典吗?加上"check_movie"和"upload_movie"映射,映射函数全都加上装饰器 3-四、"check_movie"比较简单,只是查看要上传视频的file_md5是否在数据库,注意数据库中存的只是文件地址而已,不是真实的视频文件 3-五、这里为了不上传的视频名字是同样的可是内容不同,因此文件名应该尽可能取的惟一,因此给传来的file_name加上一个随机字符串,就直接调用以前定义的 get_session方法便可 3-六、这里要拼接文件的存放路径了,根据file_size循环写入文件 3-七、生成一个 movie_obj 电影对象,调用save方法保存,而后返回back_dic说明上传成功 四、删除视频 客户端 4-一、先查询出全部没有被删除的电影列表,即send_dic字典中"type"为'get_movie_list' 和'movie_type'为"all",返回的电影列表能够所有是收费,所有是免费,收费免费都有,这里须要注意的是获取全部视频列表考 虑的不周全,若是单从管理员角度要得到全部视频不考虑用户获取收费或者免费的视频,会出现一些代码冗余,因此在获取全部视频这个功能要判断传过来的的movie_type是all、free、charge 4-二、拿到全部视频列表movie_list,该列表的格式[电影名称,是否免费收费,电影id]发送字典send_dic中"type"为"delete_movie"和delete_movie_id'为movie_list[choice-1][2] 服务端 4-三、还记得tcpserver模块中的全局func_dic字典吗?加上'get_movie_list'和"delete_movie"映射,映射函数全都加上装饰器 4-四、删除电影不是真的删除,只是找到每个电影对象,而后点is_delete属性改成1便可,因此get_movie_list方法会先得到全部对象列表,遍历列表获得每个对象,对每个对象的is_delete属性进行判断,注意还要判断 ecv_dic['movie_type'],这里是“all”类型,知足的所有添加到一个返回的列表中back_movie_list,而后返回给客户端 4-五、delete_movie方法的话 movie_list = models.Movie.select(id=recv_dic.get("delete_movie_id"))而后对列表去索引获得一个电影对象,而后修改movie_obj.is_delete,而后调用update()方法更新,而后返回back_cic 五、发布公告 客户端 5-1 公告包含title和content 发送的字典send_dic包含"type"为"release_notice"、"session"、"title"、"content" 服务端、 5-二、这里须要知道接受的字典recv_dic是包含user_id字段的,要写入表notice时用到 5-三、也是建立表notice对象,而后调用save方法保存 普通用户 一、注册 直接调用公共注册方法 二、登陆 直接调用公共登陆,在全局添加user_dic中保存session和is_vip 三、购买会员 客户端 3-一、判断全局user_dic['is_vip']可知道是不是会员 3-二、若是不是的话,让用户选择是否购买会员,购买的话最后要修改全局 服务端 3-三、根据recv_dic["user_id"]判断是哪个用户要购买会员,获得的对象点is_vip属性修改成1,调用update(0方法保存 四、查看全部视频 客户端 4-一、发送字典send_dic里面的type为'get_movie_list','movie_type为'all' 服务器 4-二、直接调用以前写好的get_movie_list方法便可 这和管理员中删除视频就先获取全部视频 五、下载免费电影 客户端 5-一、先列出全部免费电影,和上个功能差很少,只是'movie_type'改成'free' 5-二、再发送字典send_dic中'type'为'download_movie' 'movie_id'为movie_list[choice-1][2] 5-三、接受获得的字典back_dic中有一个wait_time 打印多是0或者30秒 拼接下载的路径,循环写入文件 服务端 5-四、id=recv_dic.get('movie_id')来获得电影列表movie_list,而后索引取值获得电影对象 5-5 id=recv_dic['user_id']来获得用户列表索引取得用户对象user_obj 5-六、下载电影的话先判断使用是不是vip,vip的话不须要等待30秒 不是的话须要等待30秒 5-七、更新下载记录到down_record表中 5-八、循环发送文件 5-九、发送字典back_dic 六、下载收费电影 客户端 6-一、针对普通用户和vip用户下载收费视频收费标准不同(5元 10元) 6-二、发送字典send_dic 中仍是'get_movie_list'可是电影类型为收费'movie_type':'charge' 6-三、剩下功能和下载免费电影差很少 服务器 同上 七、查看下载记录 客户端 7-一、发送字典send_dic 中的类型'check_download_record' 7-二、接受字典back_dic进行判断便可 服务端 7-三、还记得tcpserver模块中的全局func_dic字典吗?加上'check_download_record'的映射方法 7-四、要查看下载记录 先根据用户id获得一个记录列表,循环该列表获得的是每个记录对象 7-五、根据每个对象点movie_id 和电影id判断获得电影列表,索引取值获得各个对象 7-六、把每个对象的名字添加到一个自定义的列表中,用于返回给客户端 八、查看公告 客户端 8-一、发送字典send_dic 中的类型'check_notice' 8-二、接受字典back_dic进行判断便可 服务端 8-三、还记得tcpserver模块中的全局func_dic字典吗?加上'check_notice'的映射方法 8-四、Notice类调用select方法获得公告列表 8-五、列表存在的话 遍历该列表获得每个对象,返回字典中保存对象点title,点content进行返回
import os BASE_DIR = os.path.dirname(os.path.dirname(__file__)) UPDATE_MOVIE = os.path.join(BASE_DIR, 'update_movie') DOWNLOAD_MOVIE_DIR = os.path.join(BASE_DIR, 'download_movie')
from core import admin, user func_dic = {"1": admin.admin_view, "2": user.user_view} def run(): while True: print(""" 一、管理员视图 二、普通用户视图 q、退出 """) choice = input("please choice your number>>:").strip() if choice == 'q': break if choice not in func_dic: print("choice err not in range") continue func_dic.get(choice)()
import os from Tcpclient import tcpclient from lib import common from conf import setting user_info = { "session": None } def register(client): while True: name = input("please input your name>>:").strip() password = input("please input your password>>:").strip() re_password = input("please agan input your password>>:").strip() if password != re_password: print("两次密码不一致") continue send_dic = {"type": "register", "name": name, "password": common.get_md5(password), "user_type": "admin"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: print(back_dic["msg"]) break else: print(back_dic["msg"]) def login(client): while True: name = input("please input your name>>:").strip() password = input("please input your password>>:").strip() send_dic = {"type": "login", "name": name, "password": common.get_md5(password), "user_type": "admin"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: print(back_dic["msg"]) user_info["session"] = back_dic["session"] break else: print(back_dic["msg"]) def update_movie(client): """ 思路: 一、是否是要先获取有哪些能够上传的视频 二、选择好要上传的视频后是否是还要判断服务器存不存在,存在了就不须要上传了 三、那就须要校验视频文件,可自定义校验规则 四、循环上传 :param client: :return: """ while True: movie_list = common.get_movie() if not movie_list: print("暂无影片上传") return for i, m in enumerate(movie_list, start=1): print("%s:%s" % (i, m)) choice = input("please input your choice>>: ").strip() if choice == 'q': break if choice.isdigit(): choice = int(choice) if choice in range(1, len(movie_list) + 1): file_path = os.path.join(setting.UPDATE_MOVIE, movie_list[choice - 1]) file_md5 = common.get_file_md5(file_path) send_dic = {"type": "check_movie", "session": user_info["session"], "file_md5": file_md5} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: # 若是能够上传,那标识上传免费仍是收费 is_free = input("上传的影片是否免费(y/n)>>:").strip() is_free = 1 if is_free == 'y' else 0 file_name = movie_list[choice - 1] send_dic = {"type": "update_movie", "session": user_info["session"], "is_free": is_free, "file_name": file_name, "file_md5": file_md5, "file_size": os.path.getsize(file_path)} back_dic = common.send_back(send_dic, client, file_path) if back_dic["flag"]: print(back_dic["msg"]) return else: print(back_dic["msg"]) else: print(back_dic["msg"]) else: print("choice not in range") else: print("input choice must be a number !") def delete_movie(client): """ 思路: 一、先从服务器获取全部视频 二、要删除的发给服务器 :param client: :return: """ send_dic = {"type": "get_movie_list", "session": user_info["session"], "movie_type": "all"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: """ 服务器的get_movie_list会返回一个电影列表,列表里面为[电影名,收费或免费,电影id] """ movie_list = back_dic["movie_list"] for i, m in enumerate(movie_list, start=1): print("%s:%s-%s" % (i, m[0], m[1])) choice = input("input your delete movie>>:").strip() if choice.isdigit(): choice = int(choice) if choice in range(1, len(movie_list) + 1): send_dic = {"type": "delete_movie", "session": user_info["session"], "movie_id": movie_list[choice - 1][2]} back_dic = common.send_back(send_dic, client) if back_dic['flag']: print(back_dic['msg']) return else: print(back_dic['msg']) else: print('choice noe in range') else: print(back_dic['msg']) def release_notice(client): while True: title = input("please input title>>:").strip() content = input("please input content>>:").strip() send_dic = {"type": "release_notice", "session": user_info["session"], "title": title, "content": content} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: print(back_dic["msg"]) break else: print(back_dic['msg']) break func_dic = { "1": register, "2": login, "3": update_movie, "4": delete_movie, "5": release_notice } def admin_view(): client = tcpclient.get_client() while True: print(""" 一、注册 二、登陆 三、上传电影 四、删除电影 五、发布公告 """) choice = input("please choice your number>>:").strip() if choice == 'q': break if choice not in func_dic: print("choice err not in range") continue func_dic.get(choice)(client)
import os import time from Tcpclient import tcpclient from conf import setting from lib import common user_info = { "session": None, "is_vip": None } def register(client): while True: name = input("please input your name>>:").strip() password = input("please input your password>>:").strip() re_password = input("please agan input your password>>:").strip() if password != re_password: print("两次密码不一致") continue send_dic = {"type": "register", "name": name, "password": common.get_md5(password), "user_type": "user"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: print(back_dic["msg"]) break else: print(back_dic["msg"]) def login(client): while True: name = input("please input your name>>:").strip() password = input("please input your password>>:").strip() send_dic = {"type": "login", "name": name, "password": common.get_md5(password), "user_type": "user"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: print(back_dic["msg"]) user_info["session"] = back_dic["session"] user_info["is_vip"] = back_dic["is_vip"] break else: print(back_dic["msg"]) def buy_vip(client): while True: buy_vip = input("是否购买会员(y/n)>>:").strip() if buy_vip == 'q': break if buy_vip not in ['y', 'n']: print("输入有误") continue if buy_vip == 'y': send_dic = {"type": "buy_vip", "session": user_info["session"]} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: print(back_dic["msg"]) break else: print(back_dic["msg"]) else: print("欢迎下次购买") break def check_movie(client): send_dic = {"type": "get_movie_list", "session": user_info["session"], "movie_type": "all"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: movie_list = back_dic["movie_list"] for i, m in enumerate(movie_list, start=1): print("%s:%s-%s" % (i, m[0], m[1])) else: print(back_dic["msg"]) def download_free_movie(client): send_dic = {"type": "get_movie_list", "session": user_info["session"], "movie_type": "free"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: movie_list = back_dic["movie_list"] for i, m in enumerate(movie_list, start=1): print("%s:%s-%s" % (i, m[0], m[1])) while True: choice = input("请选择要下载的电影编号>>:").strip() if choice == 'q': break if choice.isdigit(): choice = int(choice) if choice in range(1, len(movie_list) + 1): send_dic = {"type": "download_movie", "session": user_info["session"], "movie_id": movie_list[choice - 1][2], "movie_type": "free"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: print("请等待》》》") time.sleep(back_dic["wait_time"]) file_path = os.path.join(setting.DOWNLOAD_MOVIE_DIR, back_dic["file_name"]) recv_size = 0 with open(file_path, 'wb') as f: while recv_size < back_dic["file_size"]: data = client.recv(1024) f.write(data) recv_size += len(data) print("下载成功") return else: print(back_dic["msg"]) else: print("choice not in range") else: print("choice must be number") else: print(back_dic["msg"]) def download_charge_movie(client): if user_info["is_vip"]: charge = input("请支付10元(y/n)>>:").strip() else: charge = input('请支付20元(y/n)>>:').strip() if charge != "y": print("慢走 不送") return send_dic = {"type": "get_movie_list", "session": user_info["session"], "movie_type": "charge"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: movie_list = back_dic["movie_list"] for i, m in enumerate(movie_list, start=1): print("%s:%s-%s" % (i, m[0], m[1])) while True: choice = input("请选择要下载的电影编号>>:").strip() if choice == 'q': break if choice.isdigit(): choice = int(choice) if choice in range(1, len(movie_list) + 1): send_dic = {"type": "download_movie", "session": user_info["session"], "movie_id": movie_list[choice - 1][2], "movie_type": "free"} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: print("请等待》》》") time.sleep(back_dic["wait_time"]) file_path = os.path.join(setting.DOWNLOAD_MOVIE_DIR, back_dic["file_name"]) recv_size = 0 with open(file_path, 'wb') as f: while recv_size < back_dic["file_size"]: data = client.recv(1024) f.write(data) recv_size += len(data) print("下载成功") return else: print(back_dic["msg"]) else: print("choice not in range") else: print("choice must be number") else: print(back_dic["msg"]) def download_movie_record(client): """ 思路:当前登陆的用户须要查看本身的观影记录,须要获得电影名 :param client: :return: """ send_dic = {"type": "download_movie_record", "session": user_info["session"]} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: back_record_list = back_dic['back_record_list'] for m in back_record_list: print(m) else: print(back_dic['msg']) def check_notice(client): """ 查看公告思路: :param client: :return: """ send_dic = {"type": "check_notice", "session": user_info["session"]} back_dic = common.send_back(send_dic, client) if back_dic["flag"]: back_record_list = back_dic['back_notice_list'] for m in back_record_list: print(m) else: print(back_dic['msg']) func_dic = { "1": register, "2": login, "3": buy_vip, "4": check_movie, "5": download_free_movie, "6": download_charge_movie, "7": download_movie_record, "8": check_notice } def user_view(): client = tcpclient.get_client() while True: print(""" 一、注册 二、登陆 三、购买会员 四、查看全部电影 五、下载免费电影 六、下载收费电影 七、查看观影记录 八、查看公告 """) choice = input("please choice your number>>:").strip() if choice == 'q': break if choice not in func_dic: print("choice err not in range") continue func_dic.get(choice)(client)
import hashlib import json import os import struct from conf import setting def send_back(send_dic, client, file=None): json_bytes = json.dumps(send_dic).encode("utf-8") client.send(struct.pack('i', len(json_bytes))) client.send(json_bytes) if file: with open(file, 'rb') as f: for line in f: client.send(line) recv_len = struct.unpack('i', client.recv(4))[0] recv_dic = json.loads(client.recv(recv_len).decode("utf-8")) return recv_dic def get_md5(password): md = hashlib.md5() md.update(password.encode("utf-8")) return md.hexdigest() def get_movie(): movie_list = os.listdir(setting.UPDATE_MOVIE) return movie_list def get_file_md5(path): md = hashlib.md5() file_size = os.path.getsize(path) file_list = [0, file_size // 3, (file_size // 3) * 2, file_size - 10] with open(path, "rb") as f: for line in file_list: f.seek(line) md.update(f.read(10)) return md.hexdigest()
import socket def get_client(): client = socket.socket() client.connect(("127.0.0.1", 1688)) return client
import os, sys from core import src sys.path.append(os.path.dirname(__file__)) if __name__ == '__main__': src.run()
import os BASE_DIR = os.path.dirname(os.path.dirname(__file__)) MOVIE_DIR = os.path.join(BASE_DIR, 'movie_dir')
from orm_pool.orm import Models, StringField, IntegerField class User(Models): table_name = 'user' id = IntegerField("id", primary_key=True) name = StringField("name") password = StringField("password") is_locked = IntegerField("is_locked", default=0) is_vip = IntegerField("is_vip", default=0) user_type = StringField("user_type") register_time = StringField("register_time") class Movie(Models): table_name = "movie" id = IntegerField("id", primary_key=True) name = StringField("name", column_type="varchar(64)") path = StringField("path") is_free = IntegerField("is_free") is_delete = IntegerField("is_delete", default=0) create_time = StringField("create_time") user_id = IntegerField("user_id") file_md5 = StringField("file_md5") class Notice(Models): table_name = "notice" id = IntegerField("id", primary_key=True) title = StringField("title") content = StringField("content") user_id = IntegerField("user_id") create_time = StringField("create_time") class DownloadRecord(Models): table_name = "download_record" id = IntegerField("id", primary_key=True) user_id = IntegerField("user_id") movie_id = IntegerField("movie_id") create_time = StringField("create_time")
import os from db import models from lib import common from conf import setting @common.login_auth def check_movie(recv_dic, conn): movie_data = models.Movie.select(file_md5=recv_dic["file_md5"]) if movie_data: back_dic = {"flag": False, "msg": "该电影已存在"} else: back_dic = {"flag": True, "msg": "能够上传"} common.send_back(back_dic, conn) @common.login_auth def update_movie(recv_dic, conn): file_name = common.get_session(recv_dic["file_name"]) + recv_dic["file_name"] file_path = os.path.join(setting.MOVIE_DIR, file_name) print(recv_dic) recv_size = 0 with open(file_path, 'wb') as f: while recv_size < recv_dic["file_size"]: data = conn.recv(1024) f.write(data) recv_size += len(data) movie_obj = models.Movie(name=file_name, path=file_path, is_free=recv_dic.get("is_free"), is_delete=0, create_time=common.get_time(), user_id=recv_dic.get("user_id"), file_md5=recv_dic.get("file_md5")) movie_obj.save() back_dic = {"flag": True, "msg": "上传成功"} common.send_back(back_dic, conn) @common.login_auth def delete_movie(recv_dic, conn): movie_obj = models.Movie.select(id=recv_dic.get('movie_id'))[0] movie_obj.is_delete = 1 movie_obj.update() back_dic = {"flag": True, "msg": "删除成功"} common.send_back(back_dic, conn) @common.login_auth def release_notice(recv_dic, conn): title = recv_dic["title"] content = recv_dic["content"] user_id = recv_dic["user_id"] create_time = common.get_time() notice_obj = models.Notice(title=title, content=content, user_id=user_id, create_time=create_time) notice_obj.save() back_dic = {"flag": True, "msg": "发布成功"} common.send_back(back_dic, conn)
from db import models from lib import common from Tcpserver import user_data def register(recv_dic, conn): user_list = models.User.select(name=recv_dic["name"]) if user_list: back_dic = {"flag": False, "msg": "用户已存在"} common.send_back(back_dic, conn) return user_obj = models.User(name=recv_dic["name"], password=recv_dic["password"], is_locked=0, is_vip=0, user_type=recv_dic["user_type"], register_time=common.get_time()) user_obj.save() back_dic = {"flag": True, "msg": "注册成功"} common.send_back(back_dic, conn) def login(recv_dic, conn): user_list = models.User.select(name=recv_dic["name"]) if user_list: user_obj = user_list[0] if user_obj.user_type == recv_dic["user_type"]: if user_obj.password == recv_dic["password"]: back_dic = {"flag": True, "msg": "登录成功", "is_vip": user_obj.is_vip} # 获取每一个用户的惟一随机字符串,用于标识每一个用户 session = common.get_session(user_obj.name) back_dic["session"] = session # 服务端要记录正在登陆的客户端,将数据user_data文件live_user字典中,放在为了更好的标识 # 每个客户,字典的key为recv_dic["addr"] -----他是一个元组包含ip和端口,值的话是一个列表 # 保存每个session和用户id # 由于时公共数据,且并发会形成数据错乱,我们给他来个锁 user_data.mutex.acquire() user_data.live_user[recv_dic["addr"]] = [session, user_obj.id] user_data.mutex.release() else: back_dic = {"flag": False, "msg": "密码不正确"} else: back_dic = {"flag": False, "msg": "用户类型不对"} else: back_dic = {"flag": False, "msg": "用户不存在"} common.send_back(back_dic, conn) @common.login_auth def get_movie_list(recv_dic, conn): """ 要给调用者返回相应的电影列表:all、free、charge :param recv_dic: :param conn: :return: """ movie_list = models.Movie.select() if movie_list: back_movie_list = [] for movie_obj in movie_list: if not movie_obj.is_delete: if recv_dic["movie_type"] == "all": back_movie_list.append([movie_obj.name, '免费' if movie_obj.is_free else '收费', movie_obj.id]) elif recv_dic["movie_type"] == "free": if movie_obj.is_free: back_movie_list.append([movie_obj.name, '免费', movie_obj.id]) else: if not movie_obj.is_free: back_movie_list.append([movie_obj.name, '收费', movie_obj.id]) if back_movie_list: back_dic = {"flag": True, "movie_list": back_movie_list} else: back_dic = {"flag": False, "msg": "暂无影片"} else: back_dic = {"flag": False, "msg": "暂无影片"} common.send_back(back_dic, conn)
import os from conf import setting from db import models from lib import common @common.login_auth def buy_vip(recv_dic, conn): user_obj = models.User.select(id=recv_dic['user_id'])[0] if user_obj.is_vip: back_dic = {"flag": False, "msg": "您已是会员啦"} else: user_obj.is_vip = 1 user_obj.save() back_dic = {"flag": True, "msg": "购买成功"} common.send_back(back_dic, conn) @common.login_auth def download_movie(recv_dic, conn): """ 下载电影功能:普通用户下载须要等待30秒 VIP用下载不须要等待 :param recv_dic: :param conn: :return: """ movie_list = models.Movie.select(id=recv_dic["movie_id"]) if movie_list: movie_obj = movie_list[0] file_path = movie_obj.path user_obj = models.User.select(id=recv_dic["user_id"])[0] wait_time = 0 if recv_dic["movie_type"] == "free": if user_obj.is_vip: wait_time = 0 else: wait_time = 30 back_dic = {"flag": True, "file_name": movie_obj.name, "file_size": os.path.getsize(file_path), "wait_time": wait_time} download_record = models.DownloadRecord(user_id=user_obj.id, movie_id=movie_obj.id, create_time=common.get_time()) download_record.save() common.send_back(back_dic, conn) with open(file_path, 'rb') as f: for line in f: conn.send(line) else: back_dic = {"flag": False, "msg": "暂无影片"} common.send_back(back_dic, conn) @common.login_auth def download_movie_record(recv_dic, conn): record_list = models.DownloadRecord.select(user_id=recv_dic["user_id"]) back_record_list = [] if record_list: for m in record_list: movie_obj = models.Movie.select(id=m.movie_id)[0] back_record_list.append(movie_obj.name) back_dic = {"flag": True, "back_record_list": back_record_list} else: back_dic = {"flag": False, "msg": "暂无下载记录"} common.send_back(back_dic, conn) @common.login_auth def check_notice(recv_dic, conn): notice_list = models.Notice.select() back_notice_list = [] if notice_list: for notice_obj in notice_list: back_notice_list.append([notice_obj.title, notice_obj.content]) back_dic = {"flag": True, "back_notice_list": back_notice_list} else: back_dic = {"flag": False, "msg": "暂无下载记录"} common.send_back(back_dic, conn)
import hashlib import json import struct import time from functools import wraps from Tcpserver import user_data def send_back(back_dic, conn): json_bytes = json.dumps(back_dic).encode("utf-8") conn.send(struct.pack('i', len(json_bytes))) conn.send(json_bytes) def get_time(): return time.strftime("%Y-%m-%d %X") def get_session(name): # 为了保证每一个用户的随机字符串是惟一的,不只要对每个用户名加密还要加上cpu执行时机----加盐 md = hashlib.md5() md.update(str(time.clock()).encode("utf-8")) md.update(name.encode("utf-8")) return md.hexdigest() def login_auth(func): @wraps(func) def inner(*args, **kwargs): # args=(recv_dic,conn) # 登陆了之后服务端user_data文件中live_user就有存在用户登录的数据,若是没有登录就没有数据,能够为此 # 来做为判断是否登陆的依据 for values in user_data.live_user.values(): # values = [session,user_id] if args[0]["session"] == values[0]: args[0]["user_id"] = values[1] break if args[0].get("user_id"): func(*args, **kwargs) else: back_dic = {"flag": False, "msg": "请先登陆"} send_back(back_dic, args[1]) return inner
from DBUtils.PooledDB import PooledDB import pymysql POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=6, # 链接池容许的最大链接数,0和None表示不限制链接数 mincached=2, # 初始化时,连接池中至少建立的空闲的连接,0表示不建立 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 maxshared=3, # 连接池中最多共享的连接数量,0和None表示所有共享。PS: 无用,由于pymysql和MySQLdb等模块的 threadsafety都为1,全部值不管设置为多少,_maxcached永远为0,因此永远是全部连接都共享。 blocking=True, # 链接池中若是没有可用链接后,是否阻塞等待。True,等待;False,不等待而后报错 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession= [], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='123', database='youku', charset='utf8', autocommit='True')
import pymysql from orm_pool import db_pool class Mysql(object): def __init__(self): self.conn = db_pool.POOL.connection() self.cursor = self.conn.cursor(pymysql.cursors.DictCursor) def close(self): self.cursor.close() self.conn.close() def select(self, sql, args=None): self.cursor.execute(sql, args) res = self.cursor.fetchall() # 列表套字典 return res def execute(self, sql, args): try: self.cursor.execute(sql, args) except BaseException as e: print(e)
from orm_pool.mysql_singleton import Mysql # 定义字段类 class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name self.column_type = column_type self.primary_key = primary_key self.default = default # 定义具体的字段 class StringField(Field): def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None): super().__init__(name, column_type, primary_key, default) class IntegerField(Field): def __init__(self, name, column_type='int', primary_key=False, default=None): super().__init__(name, column_type, primary_key, default) class ModelMetaClass(type): def __new__(cls, class_name, class_bases, class_attrs): # 我仅仅只想拦截模型表的类的建立过程 if class_name == 'Models': return type.__new__(cls, class_name, class_bases, class_attrs) # 给类放表名,主键字段,全部字段 table_name = class_attrs.get('table_name', class_name) # 定义一个存储主键的变量 primary_key = None # 定义一个字典用来存储用户自定义的表示表的全部字段信息 mappings = {} # for循环当前类的名称空间 for k, v in class_attrs.items(): if isinstance(v, Field): mappings[k] = v if v.primary_key: if primary_key: raise TypeError("主键只能有一个") primary_key = v.name # 将重复的键值对删除 for k in mappings.keys(): class_attrs.pop(k) if not primary_key: raise TypeError('必需要有一个主键') # 将处理好的数据放入class_attrs中 class_attrs['table_name'] = table_name class_attrs['primary_key'] = primary_key class_attrs['mappings'] = mappings return type.__new__(cls, class_name, class_bases, class_attrs) class Models(dict, metaclass=ModelMetaClass): def __init__(self, **kwargs): super().__init__(**kwargs) def __getattr__(self, item): return self.get(item, '没有该键值对') def __setattr__(self, key, value): self[key] = value # 查询方法 @classmethod def select(cls, **kwargs): ms = Mysql() # select * from userinfo if not kwargs: sql = 'select * from %s' % cls.table_name res = ms.select(sql) else: # select * from userinfo where id = 1 k = list(kwargs.keys())[0] v = kwargs.get(k) sql = 'select * from %s where %s=?' % (cls.table_name, k) # select * from userinfo where id = ? sql = sql.replace('?', '%s') # select * from userinfo where id = %s res = ms.select(sql, v) if res: return [cls(**r) for r in res] # 将数据库的一条数据映射成类的对象 # 新增方法 def save(self): ms = Mysql() # insert into userinfo(name,password) values('jason','123') # insert into %s(%s) values(?) fields = [] # [name,password] values = [] args = [] for k, v in self.mappings.items(): if not v.primary_key: # 将id字段去除 由于新增一条数据 id是自动递增的不须要你传 fields.append(v.name) args.append('?') values.append(getattr(self, v.name)) # insert into userinfo(name,password) values(?,?) sql = "insert into %s(%s) values(%s)" % ( self.table_name, ','.join(fields), ','.join(args)) # insert into userinfo(name,password) values(?,?) sql = sql.replace('?', '%s') ms.execute(sql, values) # 修改方法:基于已经存在了的数据进行一个修改操做 def update(self): ms = Mysql() # update userinfo set name='jason',password='123' where id = 1 fields = [] # [name,password] values = [] pr = None for k, v in self.mappings.items(): if v.primary_key: pr = getattr(self, v.name, v.default) else: fields.append(v.name + '=?') values.append(getattr(self, v.name, v.default)) sql = 'update %s set %s where %s = %s' % ( self.table_name, ','.join(fields), self.primary_key, pr) # update userinfo set name='?',password='?' where id = 1 sql = sql.replace('?', '%s') ms.execute(sql, values) # if __name__ == '__main__': # class Teacher(Models): # table_name = 'teacher' # tid = IntegerField(name='tid',primary_key=True) # tname = StringField(name='tname') # obj = Teacher(tname='jason老师') # obj.save() # res = Teacher.select() # for r in res: # print(r.tname) # print(res) # res = Teacher.select(tid=1) # teacher_obj = res[0] # teacher_obj.tname = 'jason老师' # teacher_obj.update() # res1 = Teacher.select() # print(res1) # class User(Models): # table_name = 'User' # id = IntegerField(name='id', primary_key=True) # name = StringField(name='name') # password = StringField(name='password') # print(User.primary_key) # print(User.mappings) # obj = User(name='jason') # print(obj.table_name) # print(obj.primary_key) # print(obj.mappings)
import json import socket import struct import traceback from concurrent.futures import ThreadPoolExecutor from threading import Lock from Tcpserver import user_data from interface import common_interface, admin_interface, user_interface from lib import common pool = ThreadPoolExecutor(20) #全局产生锁,为了不循环导入问题,将产生的锁放到user_data中 mutex = Lock() user_data.mutex = mutex func_dic = { "register": common_interface.register, "login": common_interface.login, "check_movie": admin_interface.check_movie, "update_movie": admin_interface.update_movie, "get_movie_list": common_interface.get_movie_list, "delete_movie": admin_interface.delete_movie, "release_notice": admin_interface.release_notice, "buy_vip": user_interface.buy_vip, "download_movie": user_interface.download_movie, "download_movie_record": user_interface.download_movie_record, "check_notice": user_interface.check_notice } def get_server(): server = socket.socket() server.bind(("127.0.0.1", 1688)) server.listen(5) while True: conn, addr = server.accept() pool.submit(working, conn, addr) def working(conn, addr): while True: try: recv_header = conn.recv(4) recv_bytes = conn.recv(struct.unpack('i', recv_header)[0]) recv_dic = json.loads(recv_bytes.decode("utf-8")) recv_dic["addr"] = str(addr) dispatch(recv_dic, conn) except Exception as e: traceback.print_exc() conn.close() # 当用户断开之后,服务器就无需保存表示他的数据 user_data.mutex.acquire() user_data.live_user.pop(str(addr)) user_data.mutex.release() break def dispatch(recv_dic, conn): if recv_dic.get("type") in func_dic: func_dic.get(recv_dic["type"])(recv_dic, conn) else: back_dic = {"flag": False, "msg": "类型不合法"} common.send_back(back_dic, conn)
live_user={}
mutex=Nonemysql
import os, sys from Tcpserver import tcpserver sys.path.append(os.path.dirname(__file__)) if __name__ == '__main__': tcpserver.get_server()