目录html
Windows Python3环境搭建前端
python setup.py install
if x > 0: print("正数") elif x = 0: print("0") else: print("负数") def add(x,y): return x+y
x=1; y=2; print(x+y)
print("this line is too long, \ so I break it to two lines")
# 单行注释 a = 1 '''这是一段 多行注释''' def add(x, y): """加法函数:这是docstring(函数说明)""" pass
x=y=z=1
x,y = y,x
x+=1
x-=1
(不支持x++
, x--
)Python3中没有常量java
str(12)
int("12")
float("1.23")
len("abcdefg")
"abcdefg".find("b"); "abcedfgg".index("g")
"AbcdeF".lower();"abcedF".upper()
isdigit("123")
,结果为 True"aabcabc".count("a")
"".join(["a","b","c"])
会按空字符链接列表元素,获得"abc""hello,java".replace("java", "python")
,将java 替换为 python"a,b,c,d".split(",")
获得["a", "b", "c", "d"]" this has blanks \n".strip()
获得"this has balnks""Name: %s, Age: %d" % ("Lily", 12)
或"Name: %(name)s, Age: %(age)d" % {"name": "Lily", "age": 12}
"Name: {}, Age: {}".format("Lily", 12)
或"Name: {name}, Age: {age}".format(name="Lily",age=12)
"Name: ${name}, Age: ${age}".safe_substitude(name="Lily",age=12)
tpl='''<html> <head><title>{title}</title></head> <body> <h1>{title}</h1> <table border=1px> <tr> <th>序号</th> <th>用例</th> <th>结果</th> </tr> {trs} </table> </body> </html> ''' tr='''<tr><td>{sn}</td> <td>{case_name}</td> <td>{result}</td> ''' title="自动化测试报告" case_results = [("1", "test_add_normal", "PASS"),("2", "test_add_negative", "PASS"), ("3", "test_add_float", "FAIL")] trs='' for case_result in case_results: tr_format = tr.format(sn=case_result[0], case_name=case_result[1], result=case_result[2]) trs += tr_format html = tpl.format(title=title, trs=trs) f = open("report.html", "w") f.write(html) f.close()
结果预览node
序号 | 用例 | 结果 |
---|---|---|
1 | test_add_normal | PASS |
2 | test_add_negative | PASS |
3 | test_add_float | FAIL |
列表元素支持各类对象的混合,支持嵌套各类对象,如
["a", 1, {"b": 3}, [1,2,3]]
python
l = [1, "hello", ("a", "b")]
a = l[0] # 经过索引获取
l.append("c");l.extend(["d","e"]);l+["f"]
l.pop() # 按索引删除,无参数默认删除最后一个;l.remove("c") # 按元素删除
l[1]="HELLO" # 经过索引修改
for i in l: print(i)
s="abcdefg"; r=''.join(reversed(a))
a=("hello",)
- 由于Python中()还有分组的含义,不加","会识别为字符串字符串/列表/元组统称为序列, 有类似的结构和操做方法mysql
l[3];l[-1]
s="abcdefg";r=s[::-1]
for item in l: print(item)
for index in range(len(l)): print(l[index])
for i,v in enumerate(l): print((i,v))
"abc"+"123";[1,2,3]+[4,5];[1,2,3].extend([4,5,6,7])
类型互转: str()/list()/tuple()git
list转str通常用join(), str转list通常用split()正则表达式
l_new=sorted(l);l_new2=reversed(l)
a = set([1,2,3])
l=[1,2,3,1,4,3,2,5,6,2];l=list(set(l))
(因为集合无序,没法保持原有顺序)d = {"a":1, "b":2}
a = d['a']
或a = d.get("a") # d中不存在"a"元素时不会报错
d["c"] = 3; d.update({"d":5, "e": 6}
d.pop("d");d.clear() # 清空
d.has_key("c")
for key in d:
或for key in d.keys():
for value in d.values():
for item in d.items():
案例: 更新接口参数 api = {"url": "/api/user/login": data: {"username": "张三", "password": "123456"}},将username修改成"李四"
api['data']['username'] = "李四"
或 api['data'].update({"username": "李四"})
redis
哈希与可哈希元素算法
可哈希元素: 为了保证每次计算出的地址相同, 要求元素长度是固定的, 如数字/字符串/只包含数字,字符串的元组, 这些都是可哈希元素
6种类型简单的特色总结
if x>0: print("正数") elif x=0: print("0") else: print("负数")
max = a if a > b else b
案例: 判断一个字符串是不ip地址
ip_str = '192.168.100.3'
ip_list = ip_str.split(".") # 将字符串按点分割成列表
is_ip = True # 先假设ip合法
if len(ip_list) != 4:
is_ip= False
else:
for num in ip_list:
if not isdigit(num) or not 0 <= int(num) <= 255:
is_ip = False
if is_ip:
print("是ip")
else:
print("不是ip")
使用map函数的实现方法(参考):
def check_ipv4(str):
ip = str.strip().split(".")
return False if len(ip) != 4 or False in map(lambda x:True if x.isdigit() and 0<= int(x) <= 255 else False, ip) else True
while 循环
html/xml/config/csv也能够按文本文件处理
f =open("test.txt")
或f =open("test.txt","r", encoding="utf-8")
或with open("test.txt) as f: # 上下文模式,出结构体自动关闭文件
r+/rb+, w+/wb+, a+/ab+: 读写,区别在于, r+/w+会清空文件再写内容, r+文件不存在会报错, a+不清空原文件,进行追加, w+/a+文件不存在时会新建文件
文件是可迭代的,能够直接用 for line in f: 遍历
def add(x, y): # 定义函数 return x+y print(add(1,3)) # 调用函数
案例: 用户注册/登陆函数
users = {"张三": "123456"} def reg(username, password): if users.get(username): # 若是用户中存在username这个key print("用户已存在") else: users[username] = password # 向users字典中添加元素 print("添加成功") def login(username, password) if not users.get(username): print("用户不存在") elif users['username'] == password: print("登陆成功") else: print("密码错误")
def func(*args, **kwargs):
能够接受任意长度和格式的参数def(x:int, y:int) -> int: # x,y为int型,函数返回为int型,只是注释,参数格式非法不会报错 return x+y
def a(): print("I'm a") def deco(func): print("call from deco") func() deco(a) # 输出"call from deco"并调用a(),输出"I'm a"
def a(): a_var = 1 def b:() # 嵌套函数 a_var += 1
案例: 求N!
def factorial(n): return 1 if n == 0 or n == 1 else n * factorial(n-1)
支持一次导入多个
一个文件夹为一个包(Python3,文件夹中不须要创建__init__.py文件)
for root,dirs,files in os.walk("./case/"): for file in files: if file.startswith("test") and file.endswith(".py"): print(os.path.join(root, file)
def buddle_sort(under_sort_list): l = under_sort_list for j in range(len(l)): for i in range(len(l)-j-1): if l[i] > l[i+1]: l[i], l[i+1] = l[i+1], l[i]
def quick_sort(l): if len(l) < 2: return l # 若是列表只有一个元素, 返回列表(用于结束迭代) else: pivot = l[0] # 取列表第一个元素为基准数 low = [i for i in l[1:] if i < pivot] # 遍历l, 将小于基准数pivot的全放入low这个列表 high = [i for i in l[1:] if i >= pivot ] return quick_sort(low) + [pivot] + quick_sort(high) # 对左右两部分分别进行迭代
def bin_search(l, n): # l为有序列表 low, high = 0, len(l) - 1 # low,high分别初始化为第一个/最后一个元素索引(最小/最大数索引) while low < high: mid = (high-low) // 2 # 地板除,保证索引为整数 if l[mid] == n: return mid elif l[mid] > n: # 中间数大于n则查找前半部分, 重置查找的最大数 high = mid -1 else: # 查找后半部分, 重置查找的最小数 low = mid + 1 return None # 循环结束没有return mid 则说明没找到
做业
首先要安装flask包:
pip install flask
from flask import Flask, request
app=Flask(__name__)
a=request.values.get("a");b=request.values.get("b")
sum = int(a) + int(b)
return str(sum) # http通常使用字符串传输数据
@app.route("/add/", methods=["GET"]) # 写到函数上面(装饰器)
if __name__ == "__main__": app.run()
保存为add.py
, 打开命令行,进入add.py所在目录,运行python add.py
完整代码
# 1. 导入包 from flask import Flask, request # 2. 实例化一个 app = Flask(__name__) # 3. 编写一个接口处理方法 @app.route("/add/", methods=["GET","POST"]) # 4. 挂载路由(指定接口的url路径), 声明接口接受的方法 def add(): # 3.1 从请求中获取参数 # request.values {"a": "1", "b": "2"} a = request.values.get("a") b = request.values.get("b") # 3.2 业务操做 sum = int(a) + int(b) # 3.3 组装响应并返回 return str(sum) # 5. 运行接口 if __name__ == '__main__': app.run() # 默认5000端口,能够指定端口app.run(port=50001)
from flask import Flask, request, jsonify app = Flask(__name__) @app.route("/api/sub/", methods=["POST"]) def sub(): if not request.json: # 若是请求数据类型非json return jsonify({"code": "100001", "msg": "请求类型错误", "data": None}) if not "a" in request.json or not "b" in request.json: # 若是参数中没有a或者没有b return jsonify({"code": "100002", "msg": "参数缺失", "data": None}) a = request.json.get("a") b = request.json.get("b") result = str(float(a) - float(b)) # 使用float支持浮点数相减 return jsonify({"code": "100000", "msg": "成功", "data": result}) # 使用jsonify将字典数据转换为json类型的相应数据 if __name__ == '__main__': app.run()
接口测试是测试系统组件间接口的一种测试。
接口测试主要用于检测外部系统与系统之 间以及内部各个子系统之间的交互点。测试的重点是要检查数据的交换,传递和控制管理过 程,以及系统间的相互逻辑依赖关系等。
Excel/TestLink/禅道
TestCase | Url | Method | DataType | a | b | Excepted | Actual | Status |
---|---|---|---|---|---|---|---|---|
test_add_normal | /api/add/ | GET | Url | 3 | 5 | 8 | ||
test_add_zero | /api/add/ | POST | FORM | 0 | 0 | 0 | ||
test_add_negetive | /api/add/ | POST | FORM | -3 | 5 | 2 | ||
test_add_float | /api/add/ | POST | FORM | 3.2 | 5.2 | 8.4 | ||
test_add_null | /api/add/ | POST | FORM | 0 |
须要安装三方包:requests pytest pytest-html
pip install requests pytest pytest-html
url = 'http://127.0.0.1:5000/add/'
params = {"a":3, "b":5} # get请求url参数, 字典格式
data = {"a":3, "b":5} # post请求请求数据, 字典格式
resp = requests.get(url=url, params=params)
resp = requests.post(url=url,data=data)
# 1. 导入包 import requests base_url = "http://127.0.0.1:5005" # 2. 组装请求 def test_add_normal(): # url 字符串格式 url = base_url + "/add/" # data {} 字典格式 data = {"a": "1", "b": "2"} # 3. 发送请求,获取响应对象 response = requests.post(url=url, data=data) # 4. 解析响应 # 5. 断言结果 assert response.text == '3'
请求格式为json
三处不一样:
data=json.dumps(data)
(使用json.dumps须要import json)完整代码:
# 1. 导入包 import requests import json base_url = "http://127.0.0.1:5005" def test_sub_normal(): url = base_url + "/api/sub/" headers = {"Content-Type": "application/json"} # 1. 必须经过headers指定请求内容类型为json data = {"a": "4", "b": "2"} data = json.dumps(data) # 2. 序列化成字符串 response = requests.post(url=url, headers=headers, data=data) # 3. 响应解析 # 响应格式为: {"code":"100000", "msg": "成功", "data": "2"} resp_code = response.json().get("code") resp_msg = response.json().get("msg") resp_data = response.json().get("data") # 断言 assert response.status_code == 200 assert resp_code == "100000" assert resp_msg == "成功" assert resp_data == "2"
补充1: 感觉Python黑科技之exec()动态生成用例:
数据文件: test_add_data.xls
TestCase | Url | Method | DataType | a | b | Excepted | Actual | Status |
---|---|---|---|---|---|---|---|---|
test_add_normal | /api/add/ | GET | Url | 3 | 5 | 8 | ||
test_add_zero | /api/add/ | POST | FORM | 0 | 0 | 0 | ||
test_add_negetive | /api/add/ | POST | FORM | -3 | 5 | 2 | ||
test_add_float | /api/add/ | POST | FORM | 3.2 | 5.2 | 8.4 | ||
test_add_null | /api/add/ | POST | FORM | 0 |
import requests import xlrd base_url = 'http://127.0.0.1:5005' # 1.打开excel wb = xlrd.open_workbook("test_add_data.xls") # 2. 获取sheet sh = wb.sheet_by_index(0) # wb.sheet_by_name("Sheet1") # 行数 sh.nrows 列数 sh.ncols # 获取单元格数据 # print(sh.cell(1,0).value) # print(sh.nrows) tpl = '''def {case_name}(): url = base_url + '/add/' data = {{"a": "{a}", "b":"{b}"}} response = requests.get(url=url, data=data) assert response.text == '{expected}' ''' for row in range(1, sh.nrows): case_name = sh.cell(row,0).value a = sh.cell(row, 4).value b = sh.cell(row, 5).value expected = sh.cell(row, 6).value case = tpl.format(case_name=case_name, a=a, b=b, expected=expected) exec(case)
动态生成的用例支持pytest发现和执行
pytest
(运行全部test开头的.py用例)或pytest test_user.py
(只运行该脚本中用例)pytest -q test_user.py
pytest test_user.py --html=test_user_report.html
pytest test_user.py --resultlog=run.log
requests.session() # 用来保持session会话,如登陆状态
url="http://127.0.0.1:5000/add/"
headers={"Content-Type": "application/json"}
params={"a":"1":"b":"2"}
data={"a":"1":"b":"2"}
files={"file": open("1.jpg")}
timeout: 超时时间,单位s, str, 超过期间会报超时错误```requests.get(url=url,params=params,timeout=10)
resp.content # 响应内容, 二进制类型
requests.get(url=url, params=params)
requests.post(url=url, data=data)
requests.post(url=url, headers={"Content-Type": "application/json"}, data=json.dumps(data)
requests.post(url=url, files={"file": open("1.jpg", "rb")})
session=requests.session(); session.post(...); session.post()
resp=requests.get(...);token=resp.split("=")[1];resp2=requests.post(....token...)
import hashlib def md5(str): m = hashlib.md5() m.update(str.encode('utf8')) return m.hexdigest() #返回摘要,做为十六进制数据字符串值 def makeSign(params): if not isinstance(params, dict): print("参数格式不正确,必须为字典格式") return None if 'sign' in params: params.pop('sign') sign = '' for key in sorted(params.keys()): sign = sign + key + '=' + str(params[key]) + '&' sign = md5(sign + 'appsecret=' + appsecret) params['sign'] = sign return params
data = makeSign(data);resp = requests.post(url=url, headers=headers, data=json.dumps(data))
Mock和单元测试的桩(Stub)相似, 是经过创建一个模拟对象来解决依赖问题的一种方法.
应用场景:
1. 依赖第三方系统接口, 没法调试
2. 所依赖接口还没有具有(先后端分离项目, 前端开发时,后端接口还没有开发完毕)
3. 所依赖接口须要修改或不稳定
4. 依赖接口较多或场景复杂, 所依赖接口不是主要验证目标的
解决方法:
1. 经过Mock.js/RAP/RAP2来动态生成, 模拟接口返回数据
2. 本身使用Flask你们简单的Mock接口
3. 使用Python自带的mock库
...
pip install suds
from suds.client import Client ip = '127.0.0.1' port = '5001' client = Client("http://%s:%s/?wsdl" % (ip, port)) result = client.service.addUser("张790", "123456") print(result)
import xmlrpc.client user = xmlrpc.client.ServerProxy('http://127.0.0.1:5002') print(user.getAll())
参数化是用来解决动态参数问题的
import csv csv_data = csv.reader(open('data/reg.csv'))
import configparser cf=configparser.ConfigParser() cf.read('data/reg.config', encoding='utf-8') cf.sections() cf.options(section) cf.get(section,option)
import json with open('data/reg.json', encoding='utf-8') as f: json_data = json.loads(f) #json_data为列表或字典
json的序列化和反序列化 需求:python的字典/列表等数据格式为内存对象,须要作存储(持久化)和进行数据交换 序列化: 从内存对象到可存储数据, 方便存储和交换数据 json.dumps: 列表/字典/json->字符串 ```str_data = json.dumps(dict_data)``` json.dump: 列表/字典/json->数据文件 ```json.dump(dict_data, open(data_file, "w"))``` 反序列化: 从存储数据到内存对象 json.loads: 字符串->字典/列表```json.loads('{"a":1,"b":2}') #获得字典{"a":1,"b":2}``` json.load: json数据文档->列表/字典```dict_data = json.load(open('data.json'))```
pip install xlrd
import xlrd wb=xlrd.open_workbook("data/reg.xlsx") sh=wb.sheet_by_index(0) sh=wb.sheet_by_name('Sheet1") sh.nrows sh.ncols sh.cell(x,y).value
from xml.dom.minidom import parse dom=parse('data/reg.xml') root=dom.documentElement user_nodes=root.getElementsByTagName("user") user_node.getAttribute('title') user_node.hasAttribute('title') name_node=user_node.getElementsByTagName('name')[0] name=name_node.childNodes[0].data
案例1: 自动执行excel用例并将结果回写excel
数据文件: test_user.xlsx
TestCase | Url | Method | DataType | Data | Excepted | Resp.text | Status |
---|---|---|---|---|---|---|---|
test_user_reg_normal | /api/user/reg/ | POST | JSON | {"name":"九小1","passwd": "123456"} | resp.json()["code"]=="100000" | ||
test_user_login_normal | /api/user/login/ | POST | FORM | {"name":"九小1","passwd": "123456"} | "成功" in resp.text |
import xlrd from xlutils.copy import copy import json import requests import sys base_url = "http://127.0.0.1:5000" def run_excel(file_name, save_file="result.xls"): wb=xlrd.open_workbook(file_name) sh=wb.sheet_by_index(0) wb2 = copy(wb) sh2 = wb2.get_sheet(0) for i in range(1,sh.nrows): url = base_url + sh.cell(i,1).value data = json.loads(sh.cell(i,4).value) headers = {} method = sh.cell(i,2).value data_type = sh.cell(i,3).value excepted = sh.cell(i,5).value if data_type.lower() == 'json': data = json.dumps(data) headers = {"Content-Type":"application/json"} if method.lower() == "get": resp = requests.get(url=url,headers=headers) else: resp = requests.post(url=url,headers=headers,data=data) if eval(excepted): status = "PASS" else: status = "FAIL" sh2.write(i,6, resp.text) sh2.write(i,7, status) wb2.save(save_file) print("保存成功") if __name__ == "__main__": if len(sys.argv)==2: run_excel(sys.argv[1]) elif len(sys.argv)>2: run_excel(sys.argv[1],sys.argv[2]) else: print("调用格式: python run_excel.py 用例文件 输出文件")
保存脚本为run_excel.py, (最好和数据文件test_user.xlsx放在同一目录下), 在脚本所在目录打开命令行,运行
python run_excel.py test_user.xlsx
生成的result.xls预览
TestCase | Url | Method | DataType | Data | Excepted | Resp.text | Status |
---|---|---|---|---|---|---|---|
test_user_reg_normal | /api/user/reg/ | POST | JSON | {"name":"九小1","passwd": "123456"} | resp.json()["code"]=="100000" | {"code":"100001","data":{"name":"\u4e5d\u5c0f1","passwod":"e10adc3949ba59abbe56e057f20f883e"},"msg":"\u5931\u8d25\uff0c\u7528\u6237\u5df2\u5b58\u5728"} | FAIL |
test_user_login_normal | /api/user/login/ | POST | FORM | {"name":"九小1","passwd": "123456"} | "成功" in resp.text | <h1>登陆成功</h1> |
PASS |
import random
随机姓名的实现: #随机汉字: chr(random.randint(0x4e00, 0x9fbf)) name=random.choice(['赵','钱','孙','李','周'])+chr(random.randint(0x4e00, 0x9fbf)
随机2个字符拼接: ''.join(random.sample('abcdefg',2)
断言/检查点是用来自动验证接口返回数据/业务操做的结果是否知足预期
re.complie()
从数据库读取数据,验证数据库数据是否符合预期
MySQL断言
pip install pymysql
import pymysql
conn = pymysql.connect(host='',port=3306,db='',user='',passwd='',charset='utf8')
cur=conn.cursor()
cur.execute("select * from user")
cur.close();conn.close()
PostgreSQL
pip install pyscopg2
import pyscopg2 conn=pyscopg2.connect(host='',port='',dbname='',user='',password='') # 注意是dbname, password cur=conn.curser() cur.execute("...") cur.fetchall()
Oracle
pip install cx_Oracle
...
Mongodb
pip install pymongo
from pymongo import MongoClient conn = MongoClient('', 27017) db = conn.mydb my_set = db.test_set for i in my_set.find({"name": "张三"}): print(i) print(my_set.findone({"name": "张三"}))
Redis断言
pip install redis
import redis r = redis.Redis(host='192.168.100.198', port=6379, password='!QE%^E2sdf23RGF@ml239', db=0) print(r.dbsize()) print(r.get('package')) print(r.hget('event_order_advance_008aea6a62115ec4923829ee09f76a9c18243f5d', 'user'))
pip install paramiko
import paramiko client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect('192.168.100.241', 22, username='root', password='1234567', timeout=4) stdin, stdout, stderr = client.exec_command('cat /proc/meminfo') print(stdout.read()) client.close()
完整用例:
import requests import pytest import json import hashlib import re def md5(string): m = hashlib.md5() m.update(string.encode('utf8')) return m.hexdigest() def makeSign(data): sign='' for key in sorted(data.keys()): sign += key + '=' + str(data[key]) + '&' sign += 'appsecret=NTA3ZTU2ZWM5ZmVkYTVmMDBkMDM3YTBi' data['sign'] = md5(sign) return data class DB(): def __init__(self): # 创建链接 self.conn = pymysql.connect(host='localhost',port=3307,user='root',passwd='',db='api',charset='utf8') # 创建一个游标 self.cur = self.conn.cursor() def __del__(self): self.cur.close() self.conn.close() def getUserByName(self, name): self.cur.execute("select * from user where name='%s'" % name) return self.cur.fetchone() def checkUser(self, name, passwd): user = self.getUserByName(name) if user: if user[2] == md5(passwd): return True else: return False else: return None class TestUser(): # pytest识别不能用__init__方法 base_url = 'http://127.0.0.1:5000' db = DB() def test_login(self): url = self.base_url + '/api/user/login/' data = {"name": "张三", "passwd": "123456"} resp = requests.post(url=url, data=data) #断言 assert resp.status_code == 200 assert '登陆成功' in resp.text def test_reg(self): url = self.base_url + '/api/user/reg/' headers = {"Content-Type": "application/json"} data = {'name': '张10', 'passwd': '123456'} resp = requests.post(url=url, headers=headers, data=json.dumps(data)) #断言 assert resp.json()['code'] == '100000' assert resp.json()['msg'] == '成功' assert self.db.getUserByName('张10') def test_uploadImage(self): url = self.base_url + '/api/user/uploadImage/' files = {'file': open("复习.txt")} resp = requests.post(url=url, files=files) #断言 assert resp.status_code == 200 assert '成功' in resp.text # todo 服务器断言 def test_getUserList(self): session = requests.session() login_url = self.base_url + '/api/user/login/' login_data = {"name": "张三", "passwd": "123456"} session.post(url=login_url, data=login_data) url = self.base_url + '/api/user/getUserList/' resp = session.get(url=url) #断言 assert resp.status_code == 200 assert '用户列表' in resp.text assert re.findall('\w{32}',t2) != [] def test_updateUser(self): session = requests.session() # 接口依赖的接口须要用session get_token_url = self.base_url + '/api/user/getToken/' params = {"appid": '136425'} token_resp = session.get(url=get_token_url, params=params) assert re.findall('token=\w{32}$') token = token_resp.text.split('=')[1] url = self.base_url + '/api/user/updateUser/?token=' + token data = {'name': '张三', 'passwd': '234567'} headers = {"Content-Type": "application/json"} resp = session.post(url=url, headers=headers, data=json.dumps(data)) #断言 assert resp.status_code == 200 assert resp.json()['code'] == '100000' assert resp.json()['msg'] == '成功' assert self.db.checkUser('张三', '234567') def test_delUser(self): url = self.base_url + '/api/user/delUser/' headers = {"Content-Type": "application/json"} data = {'name': '张10', 'passwd': '123456'} data = makeSign(data) resp = requests.post(url=url, headers=headers, data=json.dumps(data)) #断言 assert resp.status_code == 200 assert resp.json()['code'] == '100000' assert resp.json()['msg'] == '成功' assert not self.db.getUserByName('张10') if __name__ == '__main__': t = TestUser() # t.test_updateUser() # t.test_updateUser() t.test_delUser() # pytest.main("-q test_user2.py")
- case: 测试用例目录 - user: (用户模块) - test_user.py: 测试用例 - case.py: 用例公共方法 - data: 数据文件目录 - test_user_data.xlsx: 测试用例数据文件 - conf: 配置文件目录 - default.conf: 默认配置文件 - report: pytest生成的报告保存路径 - log: log保存路径,按天生成log - common: 公共方法目录 - config.py: 配置文件读取 - data.py: 数据文件读取 - db.py: 数据库链接 - log.py: 日志配置 - send_email.py: 发送邮件配置
[runtime] log_level=debug report_dir=report log_dir=log timeout=10 [server] test = http://127.0.0.1:5000 stage = http://127.0.0.1:6000 prod = http://127.0.0.1:7000 [db_test] host = localhost port = 3307 db = api user = root passwd = [db_stage] [db_prod] [email] server = smtp.sina.com user = test_results@sina.com pwd = ****** subject = Api Test Ressult receiver = superhin@126.com
TestCase | Url | Method | DataType | Data | Code | Msg |
---|---|---|---|---|---|---|
test_reg_normal | /api/user/reg/ | POST | JSON | {"name": "{NAME}", "passwd": "123456"} | 100000 | 成功 |
TestCase | Url | Method | DataType | Data | ResponseText |
---|---|---|---|---|---|
test_login_normal | /api/user/login/ | POST | FORM | {"name": "张三", "passwd": "123456"} | 登陆成功 |
checkUser | select * from user where name={NAME} |
---|---|
checkUserPasswd | select * from user where name={NAME} and passwd={PASSWD} |
""" 1. 从配置文件中获取各个段信息 2. 返回一个项目的绝对路径 """ import os import configparser # 相对导入包的问题 pro_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) class Config(object): def __init__(self, filename="default.conf"): self.cf = configparser.ConfigParser() self.cf.read(os.path.join(pro_path,"conf",filename)) def get_runtime(self, option): return self.cf.get("runtime", option) def get_server(self, option): return self.cf.get("server", option) def get_db_test(self, option): return self.cf.get("db_test", option) def get_email(self, option): return self.cf.get("email",option) if __name__ == "__main__": c = Config() print(c.get_runtime("log_level")) print(c.get_server("test"))
""" 1. 从Excel中读取接口的数据 2. 读取Sql命令 """ import xlrd import sys import os sys.path.append("..") from common.config import pro_path class Data(object): def __init__(self, filename): data_file_path = os.path.join(pro_path,"data",filename) self.wb = xlrd.open_workbook("../data/test_user_data.xlsx") def get_case(self,sheet_name, case_name): sh = self.wb.sheet_by_name(sheet_name) for i in range(1, sh.nrows): if sh.cell(i,0).value == case_name: return sh.row_values(i) print("用例名未找到") return None def get_sql(self, sql_name): sh = self.wb.sheet_by_name("SQL") for i in range(sh.nrows): if sh.cell(i,0).value == sql_name: return sh.cell(i,1).value print("sql未找到") return None if __name__ == "__main__": d = Data("test_user_data.xlsx") print(d.get_case("reg","test_reg_normal")) print(d.get_sql("checkUser"))
""" 1. 从配置文件中读取数据库配置 2. 链接数据库 3. 执行sql并返回全部结果 """ import sys import pymysql sys.path.append("..") from common.config import Config class DB(object): def __init__(self): c = Config() self.conn = pymysql.connect(host=c.get_db_test("host"), port=int(c.get_db_test("port")), db=c.get_db_test("db"), user=c.get_db_test("user"), passwd=c.get_db_test("passwd"), charset="utf8") self.cur = self.conn.cursor() def do_sql(self, sql): self.cur.execute(sql) return self.cur.fetchall() def __del__(self): self.cur.close() self.conn.close() if __name__ == "__main__": db = DB() print(db.do_sql("select * from user"))
""" 1. 配置log输出格式 time - loglevel - file - func - line - msg 2. 支持输出到log文件及屏幕 3. 支持返回一个logger,让其余模块调用 """ import sys sys.path.append("..") from common.config import Config, pro_path import time import logging import os class Log(): @classmethod def config_log(cls): cf = Config() log_dir = os.path.join(pro_path, cf.get_runtime("log_dir")) today = time.strftime("%Y%m%d", time.localtime(time.time())) log_file = os.path.join(log_dir, today+".log") # 获取一个标准的logger, 配置loglevel cls.logger = logging.getLogger() cls.logger.setLevel(eval("logging." + cf.get_runtime("log_level").upper())) # 创建不一样handler fh = logging.FileHandler(log_file, mode="a",encoding=‘utf-8’) ch = logging.StreamHandler() # 定义输出格式 ft = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") fh.setFormatter(ft) ch.setFormatter(ft) # 把定制handler 添加到咱们logger cls.logger.addHandler(fh) cls.logger.addHandler(ch) @classmethod def get_logger(cls): cls.config_log() return cls.logger if __name__ == "__main__": l= Log.get_logger() l.info("abc") l.debug("hello, debug")
""" 1. 从配置文件中读取stmp配置 2. 从report文件夹下打开report.html,发送邮件 """ import smtplib from email.mime.text import MIMEText import os import sys sys.path.append("..") from common.config import Config, pro_path from common.log import Log def send_email(report_name): cf = Config() logger = Log.get_logger() report_file = os.path.join(pro_path, cf.get_runtime("report_dir"),report_name) with open(report_file, "rb") as f: body = f.read() # 格式化email正文 msg = MIMEText(body, "html", "utf-8") # 配置email头 msg["Subject"] = cf.get_email("subject") msg["From"] = cf.get_email("user") msg["To"] = cf.get_email("receiver") # 链接smtp服务器,发送邮件 smtp = smtplib.SMTP() smtp.connect(cf.get_email("server")) smtp.login(cf.get_email("user"),cf.get_email("pwd")) smtp.sendmail(cf.get_email("user"), cf.get_email("receiver"), msg.as_string()) print("邮件发送成功") if __name__ == "__main__": send_email("report.html")
""" 1. 加载数据 2. 发送接口 3. 为用例封装一些方法 """ import sys sys.path.append("..") from common.log import Log from common.config import Config from common.db import DB from common.data import Data import json import requests class Case(object): def __init__(self): self.logger = Log.get_logger() self.cf = Config() def load_data(self, data_file): self.data = Data(data_file) def set_env(self, env): self.env = env def run_case(self, sheet_name, case_name, var={}): case_data = self.data.get_case(sheet_name, case_name) url = self.cf.get_server(self.env) + case_data[1] data = case_data[4].format(**var) if case_data[3].lower() == "form": data = json.loads(data) headers = {} else: headers = {"content-type": "application/json"} if case_data[2].lower() == "get": resp = requests.get(url=url) else: resp = requests.post(url=url, headers=headers, data=data) return resp.text def check_response(self): pass def check_db(self, sql_name, vars={}): sql = self.data.get_sql(sql_name).format(**vars) return self.db.exec_sql(sql) if __name__ == "__main__": c = Case() c.set_env("test") c.load_data("test_user_data.xlsx") r = c.run_case("login", "test_login_normal") print(r)
import sys import random import pytest sys.path.append("../..") from case.case import Case case = Case() def setup_module(module): case.set_env('dev') case.load_data('test_user_data.xlsx') def test_login_normal(): result case.run("login", "test_login_normal") if __name__ == '__main__': pytest.main(["-q", "test_user.py"])
import os import time from util.config import Config from util.e_mail import send_email import pytest def main(): cf = Config() report_dir = cf.get_report_dir() now = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) report_name = os.path.join(report_dir, 'report_' + now + '.html') pytest.main(["-q", "case", "--html=" + report_name]) send_email(report_name) if __name__ == '__main__': main()