一、思路:html
yamlapi支持unittest与pytest两种运行模式,python
yamlapi即为yaml文件+api测试的缩写,mysql
能够看做是一个脚手架工具,git
能够快速生成项目的各个目录与文件,正则表达式
只需维护一份或者多份yaml文件便可,sql
不须要大量写代码。数据库
二、安装:express
可在首页搜索“yamlapi”,api
或者直接访问项目主页:
https://pypi.org/project/yamlapi/
pip install yamlapi
# 安装
yamlapi -h(或yamlapi --help)
# 查看参数信息
yamlapi -v(或yamlapi --v)
# 查看版本号
pip install -U yamlapi
# 安装最新版
yamlapi --p=项目名称
# 建立项目
# 例如在某个路径下执行命令:yamlapi --p=demo_project
pip uninstall yamlapi
# 卸载
三、工程示例:
README.md文件:
1 # yamlapi 2 接口测试框架 3 4 5 # 1、思路 6 一、采用requests+unittest+ddt+PyMySQL+BeautifulReport+demjson+loguru+PyYAML+ruamel.yaml+pytest+pytest-html+allure-pytest+pytest-rerunfailures+pytest-sugar+pytest-timeout 7 2、requests是发起HTTP请求的第三方库 8 3、unittest是Python自带的单元测试工具 9 4、ddt是数据驱动的第三方库 10 5、PyMySQL是链接MySQL的第三方库 11 6、BeautifulReport是生成html测试报告的第三方库 12 7、demjson是解析json的第三方库 13 8、loguru是记录日志的第三方库 14 9、PyYAML与ruamel.yaml是读写yaml文件的第三方库 15 10、pytest是单元测试的第三方库 16 十一、pytest-html是生成html测试报告的插件 17 十二、allure-pytest是生成allure测试报告的插件 18 1三、pytest-rerunfailures是失败重跑的插件 19 1四、pytest-sugar是显示进度的插件 20 1五、pytest-timeout是设置超时时间的插件 21 22 23 # 2、目录结构 24 1、case是测试用例包 25 2、log是日志目录 26 3、report是测试报告的目录 27 4、resource是yaml文件的目录 28 5、setting是工程的配置文件包 29 6、tool是经常使用方法的封装包 30 31 32 # 3、yaml文件说明 33 1、字段(命名和格式不可修改,顺序能够修改) 34 case_name: 用例名称 35 mysql: MySQL查询语句 36 request_mode: 请求方式 37 api: 接口 38 data: 请求体,缩进字典格式或者json格式 39 headers: 请求头,缩进字典格式或者json格式 40 query_string: 请求参数,缩进字典格式或者json格式 41 expected_code: 预期的响应代码 42 expected_result: 预期的响应结果,-列表格式 43 regular: 正则,缩进字典格式 44 >>variable:变量名,-列表格式 45 >>expression:表达式,-列表格式 46 47 2、参数化 48 正则表达式提取的结果用${变量名}表示,一条用例里面能够有多个 49 MySQL返回的结果用{__SQL}表示,一条用例里面能够有多个 50 随机数字用{__RN位数},一条用例里面能够有多个 51 随机英文字母用{__RL位数},一条用例里面能够有多个 52 以上4种类型在一条用例里面能够混合使用 53 ${变量名}的做用域是全局的,其它3种的做用域仅限该条用例
demo_test.py文件:
1 """ 2 测试用例 3 """ 4 5 import json 6 import re 7 import os 8 import sys 9 import unittest 10 from itertools import chain 11 from time import sleep 12 13 import ddt 14 import demjson 15 import requests 16 17 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 18 sys.path.append(BASE_DIR) 19 20 from setting.project_config import * 21 from tool.connect_mysql import query_mysql 22 from tool.create_random import create_random_number, create_random_letters 23 from tool.read_write_yaml import read_yaml, write_yaml 24 from tool.beautiful_report_run import beautiful_report_run 25 26 27 @ddt.ddt 28 # 声明使用ddt 29 class DemoTest(unittest.TestCase): 30 temporary_yaml = yaml_path + "/temporary.yaml" 31 if os.path.isfile(temporary_yaml): 32 # 若是临时yaml文件存在 33 os.remove(temporary_yaml) 34 # 删除之 35 demo_one_list = read_yaml("/demo_one.yaml") 36 demo_two_list = read_yaml("/demo_two.yaml") 37 demo_three_list = read_yaml("/demo_three.yaml") 38 temporary_list = demo_one_list + demo_two_list + demo_three_list 39 temporary_yaml = yaml_path + write_yaml("/temporary.yaml", temporary_list) 40 41 # 把几个yaml文件合并为一个临时yaml文件 42 43 @classmethod 44 def setUpClass(cls) -> None: 45 cls.variable_result_dict = {} 46 # 定义一个变量名与提取的结果字典 47 # cls.variable_result_dict与self.variable_result_dict都是本类的公共属性 48 49 @ddt.file_data(yaml_path + "/temporary.yaml") 50 # 传入临时yaml文件 51 def test_demo(self, **kwargs): 52 """ 53 测试用例 54 :param kwargs: 55 :return: 56 """ 57 58 kwargs = str(kwargs) 59 if "None" in kwargs: 60 kwargs = kwargs.replace("None", "''") 61 kwargs = demjson.decode(kwargs) 62 # 把值为None的替换成''空字符串,由于None没法拼接 63 # demjson.decode()等价于json.loads()反序列化 64 65 case_name = kwargs.get("case_name") 66 # 用例名称 67 self._testMethodDoc = case_name 68 # 测试报告里面的用例描述 69 mysql = kwargs.get("mysql") 70 # mysql查询语句 71 request_mode = kwargs.get("request_mode") 72 # 请求方式 73 api = kwargs.get("api") 74 # 接口 75 if type(api) != str: 76 api = str(api) 77 payload = kwargs.get("data") 78 # 请求体 79 if type(payload) != str: 80 payload = str(payload) 81 headers = kwargs.get("headers") 82 # 请求头 83 if type(headers) != str: 84 headers = str(headers) 85 query_string = kwargs.get("query_string") 86 # 请求参数 87 if type(query_string) != str: 88 query_string = str(query_string) 89 expected_code = kwargs.get("expected_code") 90 # 预期的响应代码 91 expected_result = kwargs.get("expected_result") 92 # 预期的响应结果 93 regular = kwargs.get("regular") 94 # 正则 95 96 logger.info("{}>>>开始执行", case_name) 97 98 if environment == "prd" and mysql != "": 99 self.skipTest("生产环境跳过此用例,请忽略") 100 # 生产环境不能链接MySQL数据库,所以跳过,此行后面的都不会执行 101 102 requests_list = [api, payload, headers, query_string] 103 # 请求数据列表 104 105 for index, value in enumerate(requests_list): 106 # for循环修改requests_list的值 107 108 if self.variable_result_dict: 109 # 若是变量名与提取的结果字典不为空 110 if "$" in value: 111 for key, value_2 in self.variable_result_dict.items(): 112 value = value.replace("{" + key + "}", value_2) 113 # replace(old, new)把字符串中的旧字符串替换成正则表达式提取的值 114 value = re.sub("\\$", "", value) 115 # re.sub(old, new, 源字符串)默认所有替换 116 # 若是遇到带有转义的字符被看成特殊字符时,使用双反斜杠\\来转义,或者在引号前面加r 117 else: 118 pass 119 120 if mysql: 121 # 若是mysql查询语句不为空 122 if "$" in mysql: 123 # 有些场景下MySQL查询语句也须要参数化 124 for key, value_2 in self.variable_result_dict.items(): 125 mysql = mysql.replace("{" + key + "}", value_2) 126 mysql = re.sub("\\$", "", mysql) 127 mysql_result_tuple = query_mysql(mysql) 128 # mysql查询结果元祖 129 mysql_result_list = list(chain.from_iterable(mysql_result_tuple)) 130 # 把二维元祖转换为一维列表 131 if "__SQL" in value: 132 for i in mysql_result_list: 133 if type(i) != str: 134 i = str(i) 135 value = value.replace("{__SQL}", i, 1) 136 # replace(old, new, 替换次数)把字符串中的{__SQL}替换成mysql查询返回的值 137 else: 138 pass 139 140 if "__RN" in value: 141 digit_list = re.findall("{__RN(.+?)}", value) 142 # 获取位数列表 143 for j in digit_list: 144 random_number = create_random_number(int(j)) 145 # 调用生成随机数字的方法 146 value = value.replace("{__RN" + j + "}", random_number) 147 148 if "__RL" in value: 149 digit_list = re.findall("{__RL(.+?)}", value) 150 # 获取位数列表 151 for i in digit_list: 152 random_letters = create_random_letters(int(i)) 153 # 调用生成随机字母的方法 154 value = value.replace("{__RL" + i + "}", random_letters) 155 156 requests_list[index] = value 157 158 api = requests_list[0] 159 payload = requests_list[1] 160 headers = requests_list[2] 161 query_string = requests_list[3] 162 163 if payload != "": 164 payload = demjson.decode(payload) 165 if headers != "": 166 headers = demjson.decode(headers) 167 if query_string != "": 168 query_string = demjson.decode(query_string) 169 170 url = service_domain + api 171 # 拼接完整地址 172 173 logger.info("请求方式为:{}", request_mode) 174 logger.info("地址为:{}", url) 175 logger.info("请求体为:{}", payload) 176 logger.info("请求头为:{}", headers) 177 logger.info("请求参数为:{}", query_string) 178 179 logger.info("预期的响应代码为:{}", expected_code) 180 logger.info("预期的响应结果为:{}", expected_result) 181 182 response = requests.request( 183 request_mode, url, data=json.dumps(payload), 184 headers=headers, params=query_string, timeout=(9, 15)) 185 # 发起HTTP请求 186 # json.dumps()序列化把字典转换成字符串,json.loads()反序列化把字符串转换成字典 187 # data请求体为字符串,headers请求头与params请求参数为字典 188 189 actual_time = response.elapsed.total_seconds() 190 # 实际的响应时间 191 actual_code = response.status_code 192 # 实际的响应代码 193 actual_result_text = response.text 194 # 实际的响应结果(文本格式) 195 196 logger.info("实际的响应代码为:{}", actual_code) 197 logger.info("实际的响应结果为:{}", actual_result_text) 198 logger.info("实际的响应时间为:{}", actual_time) 199 200 if regular: 201 # 若是正则不为空 202 extract_list = [] 203 # 定义一个提取结果列表 204 for i in regular["expression"]: 205 regular_result = re.findall(i, actual_result_text)[0] 206 # re.findall(正则表达式, 实际的响应结果)返回一个符合规则的list,取第1个 207 extract_list.append(regular_result) 208 # 把提取结果添加到提取结果列表里面 209 210 temporary_dict = dict(zip(regular["variable"], extract_list)) 211 # 把变量列表与提取结果列表转为一个临时字典 212 213 for key, value in temporary_dict.items(): 214 self.variable_result_dict[key] = value 215 # 把临时字典合并到变量名与提取的结果字典,已去重 216 else: 217 pass 218 219 for key in list(self.variable_result_dict.keys()): 220 if not self.variable_result_dict[key]: 221 del self.variable_result_dict[key] 222 # 删除变量名与提取的结果字典中为空的键值对 223 224 actual_result_text = re.sub("{|}|\"|\\[|\\]", "", actual_result_text) 225 # 去除{、}、"、[与] 226 actual_result_list = re.split(":|,", actual_result_text) 227 # 把响应文本转为列表,并去除:与, 228 229 if expected_code == actual_code: 230 if set(expected_result) <= set(actual_result_list): 231 # 预期的响应结果与实际的响应结果是被包含关系 232 # 判断是不是其真子集 233 logger.info("{}>>>执行经过", case_name) 234 else: 235 logger.error("{}>>>执行失败", case_name) 236 self.assertTrue(set(expected_result) <= set(actual_result_list)) 237 # 布尔表达式断言 238 else: 239 logger.error("{}>>>请求失败,请检查域名、路径与请求参数是否正确!", url) 240 logger.error("{}>>>执行失败", case_name) 241 self.assertTrue(set(expected_result) <= set(actual_result_list)) 242 243 logger.info("##########用例分隔符##########\n") 244 # sleep(3) 245 # 等待时间为3秒,也能够调整为其余值 246 247 248 if __name__ == '__main__': 249 beautiful_report_run(DemoTest) 250 # 调用BeautifulReport运行方式
project_config.py文件:
1 """ 2 整个工程的配置文件 3 """ 4 5 import os 6 import sys 7 import time 8 9 from loguru import logger 10 11 parameter = sys.argv[1] 12 # 从命令行获取参数 13 14 environment = os.getenv("measured_environment", parameter) 15 # 环境变量 16 17 if environment == "dev": 18 service_domain = "http://www.dev.com" 19 # 开发环境 20 db_host = 'mysql.dev.com' 21 db_port = 3306 22 elif environment == "test": 23 service_domain = "http://www.test.com" 24 # 测试环境 25 db_host = 'mysql.test.com' 26 db_port = 3307 27 elif environment == "pre": 28 service_domain = "http://www.pre.com" 29 # 预生产环境 30 db_host = 'mysql.pre.com' 31 db_port = 3308 32 elif environment == "formal": 33 service_domain = "https://www.formal.com" 34 # 生产环境 35 db_host = None 36 db_port = None 37 38 db_user = 'root' 39 db_password = '123456' 40 db_database = '' 41 # MySQL数据库配置 42 43 44 current_path = os.path.dirname(os.path.dirname(__file__)) 45 # 获取当前目录的父目录的绝对路径 46 # 也就是整个工程的根目录 47 case_path = os.path.join(current_path, "case") 48 # 测试用例的目录 49 yaml_path = os.path.join(current_path, "resource") 50 # yaml文件的目录 51 today = time.strftime("%Y-%m-%d", time.localtime()) 52 # 年月日 53 54 report_path = os.path.join(current_path, "report") 55 # 测试报告的目录 56 if os.path.exists(report_path): 57 pass 58 else: 59 os.mkdir(report_path, mode=0o777) 60 61 log_path = os.path.join(current_path, "log") 62 # 日志的目录 63 if os.path.exists(log_path): 64 pass 65 else: 66 os.mkdir(log_path, mode=0o777) 67 68 logging_file = os.path.join(log_path, "log{}.log".format(today)) 69 70 logger.add( 71 logging_file, 72 format="{time:YYYY-MM-DD HH:mm:ss}|{level}|{message}", 73 level="INFO", 74 rotation="500 MB", 75 encoding="utf-8", 76 ) 77 # loguru日志配置
四、运行:
unittest模式:
python+测试文件名+环境缩写
python ./case/demo_test.py dev
python ./case/demo_test.py test
python ./case/demo_test.py pre
python ./case/demo_test.py formal
pytest模式:
pytest -v
完整命令为:
pytest -v --reruns 3 --reruns-delay 3 --timeout=60 --junitxml=./report/report.xml --html=./report/report.html --self-contained-html --alluredir=./report/allure-report
已写进pytest.ini配置文件