最近楼主加班 喽, 很久没有更新个人博客了,哎,一言难尽,废话我就不说了,来开始上精华。javascript
背景:目前市面上有不少bug管理工具,可是各有各的特色,最著名,最流行的就是禅道,一个偶然的机会接触到了python ,学到tornado后,就想着去怎么去用到实处,后来发现本身公司的除了禅道就记录bug没有什么能够用的工具了。前端
语言:python3 第三库 :tornado,qiniu(用于云存储文件),数据库用sqlitejava
why use tornado?不少人其实会这么问我,我感受tornado能够实现异步,虽然如今代码尚未用到异步,我感受仍是很不错的框架,值得学习,如今不少公司都在用,我的感受这是一个不错的,值得咱们你们去学习的框架。python
来看看个人需求文档web
大题是这么的轮廓,那么拿到这个的时候,我会进行需求分析,sql
须要什么样的数据库, 数据模型之间的关系,虽然如今仍是有不少地方是写死的尚未进行搜索功能的设置,可是我相信,有了如今这个demo数据库
那么我开始来设计下我主要会结构flask
handlsers 存放相似flask的views后端
models存数据库相关的,api
static存放静态
template存放模板
untils存放公共库
setting 配置文件
urls url地址映射
run 运行文件。
那么我来开始设计个人数据,其实个人数据模型也是一波三折的。
选择用了sqlalchemy,以前用过flask的sqlalchemy感受不错
from models.dataconfig import db_session,Base,create_all from sqlalchemy import Column,Integer,DateTime,Boolean,String,ForeignKey,desc,asc,Text from sqlalchemy.orm import relationship,backref from untils.common import encrypt import datetime class User(Base): __tablename__='users' id=Column(Integer(),primary_key=True) username=Column(String(64),unique=True,index=True) email=Column(String(64)) password=Column(String(64)) last_logtime=Column(DateTime()) status=Column(Integer()) leves=Column(Integer()) iphone=Column(Integer()) Projects=relationship('Project',backref='users') shebei=relationship('Shebei',backref='users') file=relationship('FilePan',backref='users') banben=relationship('BanbenWrite',backref='users') testresult=relationship('TestResult',backref='users') testcase=relationship('TestCase',backref='users') buglog=relationship('BugLog',backref='users') def __repr__(self): return self.username @classmethod def get_by_id(cls, id): item = db_session.query(User).filter(User.id==id).first() return item @classmethod def get_by_username(cls, username): item = db_session.query(User).filter(User.username== username).first() return item @classmethod def get_count(cls): return db_session.query(Shebei).count() @classmethod def add_new(cls,username,password,iphone,email,leves): new=User(username=username,iphone=iphone,email=email,leves=leves) new.password=encrypt(password) new.status=0 db_session.add(new) try: db_session.commit() except: db_session.rollback() class Shebei(Base): __tablename__='shebeis' id=Column(Integer(),primary_key=True) shebei_id=Column(String(32),unique=True) shebei_name=Column(String(64)) shebei_xitong=Column(String(64)) shebei_xinghao=Column(String(255)) shebei_jiage=Column(Integer()) shebei_fapiaobianhao=Column(String(64)) shebei_quanxian=Column(Boolean()) shebei_jie=Column(Boolean()) shebei_shuyu=Column(String()) shebei_date=Column(DateTime()) shebei_user=Column(String()) gou_date=Column(DateTime()) shebei_status=Column(String(16)) she_sta=Column(Integer(),default=0) ruku_user=Column(Integer(),ForeignKey('users.id')) def __repr__(self): return self.shebei_name @classmethod def get_by_name(cls,name): item=db_session.query(Shebei).filter(Shebei.shebei_name==name).first() return item @classmethod def get_by_id(cls,id): item=db_session.query(Shebei).filter(Shebei.id==id).first() return item @classmethod def get_count(cls): return db_session.query(Shebei).count() class TestResult(Base): __tablename__='testresults' id=Column(Integer(),primary_key=True) porject_id=Column(Integer(),ForeignKey('projects.id')) creat_time=Column(DateTime()) bug_first=Column(Integer()) ceshirenyuan=Column(String(255)) is_send=Column(Boolean(),default=True) filepath=Column(String(64)) status=Column(Integer(),default=0) user_id=Column(Integer(),ForeignKey('users.id')) def __repr__(self): return self.porject_name @classmethod def get_by_name(cls,name): item=db_session.query(TestResult).filter(TestResult.porject_name==name).first() return item @classmethod def get_by_id(cls,id): item=db_session.query(TestResult).filter(TestResult.id==id).first() return item @classmethod def get_by_user_id(cls,user_id): item=db_session.query(TestResult).filter(TestResult.user_id==user_id).first() return item @classmethod def get_count(cls): return db_session.query(TestResult).count() class BanbenWrite(Base): __tablename__='banbens' id=Column(Integer(),primary_key=True) porject_id=Column(Integer(),ForeignKey('projects.id')) creat_time=Column(DateTime(),default=datetime.datetime.now()) banbenhao=Column(String(32)) is_xian=Column(Boolean(),default=False) is_test=Column(Boolean(),default=False) status=Column(Integer()) user_id=Column(Integer(),ForeignKey('users.id')) bugadmin=relationship('BugAdmin',backref='banbens') def __repr__(self): return self.banbenhao @classmethod def get_by_name(cls,name): item=db_session.query(BanbenWrite).filter(BanbenWrite.porject_name==name).first() return item @classmethod def get_by_id(cls,id): item=db_session.query(BanbenWrite).filter(BanbenWrite.id==id).first() return item @classmethod def get_by_user_id(cls,user_id): item=db_session.query(BanbenWrite).filter(BanbenWrite.user_id==user_id).first() return item @classmethod def get_count(cls): return db_session.query(BanbenWrite).count() class FilePan(Base): __tablename__='files' id=Column(Integer(),primary_key=True) file_fenlei=Column(String(64)) file_name=Column(String(64)) down_count=Column(Integer(),default=0) creat_time=Column(DateTime(),default=datetime.datetime.now()) status=Column(Integer(),default=0) down_url=Column(String(64)) is_tui=Column(Boolean(),default=False) user_id=Column(Integer(),ForeignKey('users.id')) def __repr__(self): return self.file_name @classmethod def get_by_file_name(cls,name): item=db_session.query(FilePan).filter(FilePan.file_name==name).first() return item @classmethod def get_by_id(cls,id): item=db_session.query(FilePan).filter(FilePan.id==id).first() return item @classmethod def get_by_user_id(cls,user_id): item=db_session.query(FilePan).filter(FilePan.user_id==user_id).first() return item @classmethod def get_count(cls): return db_session.query(FilePan).count() class BugAdmin(Base): __tablename__='bugadmins' id=Column(Integer(),primary_key=True) porject_id=Column(Integer(),ForeignKey('projects.id')) bugname=Column(String(64)) bugdengji=Column(String(64)) bugtime=Column(DateTime(),default=datetime.datetime.now()) bug_miaoshu=Column(String(255)) ban_id=Column(Integer(),ForeignKey('banbens.id')) fujian=Column(String(64)) is_que=Column(Boolean()) bug_status=Column(String(64)) bug_jiejuefangan=Column(String(64)) bug_send=Column(String(64)) status=Column(Integer(),default=0) bug_log=relationship('BugLog',backref='bugadmins') user_id=Column(Integer(),ForeignKey('users.id')) def __repr__(self): return self.bugname @classmethod def get_by_bugname(cls,bugname): item=db_session.query(BugAdmin).filter(BugAdmin.bugname==bugname).first() return item @classmethod def get_by_id(cls,id): item=db_session.query(BugAdmin).filter(BugAdmin.id==id).first() return item @classmethod def get_by_porject_name(cls,porject_name): item=db_session.query(BugAdmin).filter(BugAdmin.porject_name==porject_name).first() return item @classmethod def get_count(cls): return db_session.query(BugAdmin).count() class TestCase(Base): __tablename__='testcases' id=Column(Integer(),primary_key=True) porject_id=Column(Integer(),ForeignKey('projects.id')) casename=Column(String(64)) case_qianzhi=Column(String()) case_buzhou=Column(String()) case_yuqi=Column(String()) status=Column(Integer(),default=0) case_crea_time=Column(DateTime(),default=datetime.datetime.now()) user_id=Column(Integer(),ForeignKey('users.id')) def __repr__(self): return self.casename @classmethod def get_by_project_name(Cls,project_name): item=db_session.query(TestCase).filter(TestCase.project_name==project_name).first() return item @classmethod def get_by_casename(Cls,casename): item=db_session.query(TestCase).filter(TestCase.casename==casename).first() return item @classmethod def get_by_id(cls,id): item=db_session.query(TestCase).filter(TestCase.id==id).first() return item @classmethod def get_count(cls): return db_session.query(TestCase).count() class BugLog(Base): __tablename__='buglogs' id=Column(Integer(),primary_key=True) bug_id=Column(Integer(),ForeignKey('bugadmins.id')) caozuo=Column(String()) caozuo_time=Column(DateTime()) user_id=Column(Integer(),ForeignKey('users.id')) def __repr__(self): return self.caozuo @classmethod def get_by_id(Cls,id): item=db_session.query(BugLog).filter(BugLog.id==id).first() return item @classmethod def get_by_user_id(Cls,user_id): item=db_session.query(BugLog).filter(BugLog.user_id==user_id).first() return item @classmethod def get_by_bug_id(Cls,bug_id): item=db_session.query(BugLog).filter(BugLog.bug_id==bug_id).first() return item class Project(Base): __tablename__='projects' id=Column(Integer(),primary_key=True) name=Column(String(64)) user_id=Column(Integer(),ForeignKey('users.id')) bug_log=relationship('BugAdmin',backref='projects') banben=relationship('BanbenWrite',backref='projects') testresult=relationship('TestResult',backref='projects') testcase=relationship('TestCase',backref='projects') def __repr__(self): return self.name @classmethod def get_by_id(cls,id): item=db_session.query(Project).filter(Project.id==id).first() return item @classmethod def get_by_name(cls,name): item=db_session.query(Project).filter(Project.name==name).first() return item
这是数据库相关的,
1 数据库配置相关的 2 3 from sqlalchemy import create_engine 4 from sqlalchemy.orm import scoped_session,sessionmaker 5 from sqlalchemy.ext.declarative import declarative_base 6 engine=create_engine('sqlite:///shebei.db',convert_unicode=True) 7 Base=declarative_base() 8 db_session=scoped_session(sessionmaker(bind=engine)) 9 def create_all(): 10 Base.metadata.create_all(engine) 11 def drop_all(): 12 Base.metadata.drop_all(engine)
其实在开发的过程当中,也遇到了不少阻力,好比下载一直实现很差,好比分页,也是参照别人的实现的,
分页公共模块
class Pagination: def __init__(self, current_page, all_item): try: page = int(current_page) except: page = 1 if page < 1: page = 1 all_pager, c = divmod(all_item, 10) if int(c) > 0: all_pager += 1 self.current_page = page self.all_pager = all_pager @property def start(self): return (self.current_page - 1) * 10 @property def end(self): return self.current_page * 10 def string_pager(self, base_url="/index/"): if self.current_page == 1: prev = '<li><a href="javascript:void(0);">上一页</a></li>' else: prev = '<li><a href="%s%s">上一页</a></li>' % (base_url, self.current_page - 1,) if self.current_page == self.all_pager: nex = '<li><a href="javascript:void(0);">下一页</a></li>' else: nex = '<li><a href="%s%s">下一页</a></li>' % (base_url, self.current_page + 1,) last = '<li><a href="%s%s">尾页</a></li>' % (base_url, self.all_pager,) str_page = "".join((prev,nex,last)) return str_page
在上传文件的时候,原来存放在本地,结果呢,下载处理很差,因而乎选择了七牛,须要到七牛的官网去注册本身的帐号
from qiniu import Auth,put_file,etag,urlsafe_base64_encode import qiniu.config access_key='uVxowDUcYx641ivtUb111WBEI4112L3D117JHNM_AOtskRh4' secret_key='PdXU9XrXTLtp1N21bhU1Frm1FDZqE1qhjkEaE9d1xVLZ5C' def sendfile(key,file): q=Auth(access_key,secret_key) bucket_name='leilei22' token = q.upload_token(bucket_name, key) ret, info = put_file(token, key, file) me= ret['hash'] f=etag(file) if me==f: assert_t=True else: assert_t=False return assert_t
解析Excel,主要用于上传测试用例
import xlrd,xlwt from xlutils.copy import copy def datacel(filepath): file=xlrd.open_workbook(filepath) me=file.sheets()[0] nrows=me.nrows porject_id_list=[] casename_list=[] case_qianzhi_list=[] case_buzhou_list=[] case_yuqi_list=[] for i in range(1,nrows): porject_id_list.append(me.cell(i,0).value) casename_list.append(me.cell(i,2).value) case_qianzhi_list.append(me.cell(i,3).value) case_buzhou_list.append(me.cell(i,4).value) case_yuqi_list.append(me.cell(i,1).value) return porject_id_list,casename_list,case_qianzhi_list,case_buzhou_list,case_yuqi_list
其实这么如今公共模块完毕了,其实如今能够着手去开始写咱们的代码了,主要的代码,还有静态界面,由于先后端都是我本身,个人前端其实仍是从网上找来的模板,
学习的道路是痛苦的,可是我相信我是能够成功的,
from tornado.web import RequestHandler from models.model_py import User class BaseHandler(RequestHandler): @property def db(self): return self.application.db def get_current_user(self): user_id = self.get_secure_cookie('user_id') if not user_id: return None return User.get_by_id(int(user_id))
基础的类集成了RequestHandler的类,,进行了一些简单的自定义,而后后续能够用这个,
进过两周的开发,已经造成了成熟的,