yamlapi接口测试框架

一、思路:html

yamlapi支持unittest与pytest两种运行模式,python

yamlapi即为yaml文件+api测试的缩写,mysql

能够看做是一个脚手架工具,git

能够快速生成项目的各个目录与文件,正则表达式

只需维护一份或者多份yaml文件便可,sql

不须要大量写代码。数据库

二、安装:express

https://pypi.org/json

可在首页搜索“yamlapi”,api

或者直接访问项目主页:

https://pypi.org/project/yamlapi/

 

pip install yamlapi

# 安装

yamlapi -h(或yamlapi --help)

# 查看参数信息

yamlapi -v(或yamlapi --v)

# 查看版本号

pip install -U yamlapi

# 安装最新版

yamlapi --p=项目名称

# 建立项目

# 例如在某个路径下执行命令:yamlapi --p=demo_project

pip uninstall yamlapi

# 卸载

三、工程示例:

README.md文件:

 1 # yamlapi  
 2 接口测试框架  
 3 
 4 
 5 # 1、思路         
 6 一、采用requests+unittest+ddt+PyMySQL+BeautifulReport+demjson+loguru+PyYAML+ruamel.yaml+pytest+pytest-html+allure-pytest+pytest-rerunfailures+pytest-sugar+pytest-timeout  
 7 2、requests是发起HTTP请求的第三方库  
 8 3、unittest是Python自带的单元测试工具  
 9 4、ddt是数据驱动的第三方库  
10 5、PyMySQL是链接MySQL的第三方库  
11 6、BeautifulReport是生成html测试报告的第三方库  
12 7、demjson是解析json的第三方库  
13 8、loguru是记录日志的第三方库  
14 9、PyYAML与ruamel.yaml是读写yaml文件的第三方库  
15 10、pytest是单元测试的第三方库  
16 十一、pytest-html是生成html测试报告的插件  
17 十二、allure-pytest是生成allure测试报告的插件  
18 1三、pytest-rerunfailures是失败重跑的插件   
19 1四、pytest-sugar是显示进度的插件  
20 1五、pytest-timeout是设置超时时间的插件  
21 
22 
23 # 2、目录结构    
24 1、case是测试用例包              
25 2、log是日志目录         
26 3、report是测试报告的目录       
27 4、resource是yaml文件的目录      
28 5、setting是工程的配置文件包            
29 6、tool是经常使用方法的封装包         
30 
31 
32 # 3、yaml文件说明  
33 1、字段(命名和格式不可修改,顺序能够修改)  
34 case_name: 用例名称   
35 mysql: MySQL查询语句   
36 request_mode: 请求方式  
37 api: 接口    
38 data: 请求体,缩进字典格式或者json格式     
39 headers: 请求头,缩进字典格式或者json格式    
40 query_string: 请求参数,缩进字典格式或者json格式    
41 expected_code: 预期的响应代码    
42 expected_result: 预期的响应结果,-列表格式     
43 regular: 正则,缩进字典格式  
44 >>variable:变量名,-列表格式  
45 >>expression:表达式,-列表格式  
46 
47 2、参数化  
48 正则表达式提取的结果用${变量名}表示,一条用例里面能够有多个    
49 MySQL返回的结果用{__SQL}表示,一条用例里面能够有多个   
50 随机数字用{__RN位数},一条用例里面能够有多个   
51 随机英文字母用{__RL位数},一条用例里面能够有多个  
52 以上4种类型在一条用例里面能够混合使用  
53 ${变量名}的做用域是全局的,其它3种的做用域仅限该条用例  

demo_test.py文件:

  1 """
  2 测试用例
  3 """
  4 
  5 import json
  6 import re
  7 import os
  8 import sys
  9 import unittest
 10 from itertools import chain
 11 from time import sleep
 12 
 13 import ddt
 14 import demjson
 15 import requests
 16 
 17 BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 18 sys.path.append(BASE_DIR)
 19 
 20 from setting.project_config import *
 21 from tool.connect_mysql import query_mysql
 22 from tool.create_random import create_random_number, create_random_letters
 23 from tool.read_write_yaml import read_yaml, write_yaml
 24 from tool.beautiful_report_run import beautiful_report_run
 25 
 26 
 27 @ddt.ddt
 28 # 声明使用ddt
 29 class DemoTest(unittest.TestCase):
 30     temporary_yaml = yaml_path + "/temporary.yaml"
 31     if os.path.isfile(temporary_yaml):
 32         # 若是临时yaml文件存在
 33         os.remove(temporary_yaml)
 34         # 删除之
 35     demo_one_list = read_yaml("/demo_one.yaml")
 36     demo_two_list = read_yaml("/demo_two.yaml")
 37     demo_three_list = read_yaml("/demo_three.yaml")
 38     temporary_list = demo_one_list + demo_two_list + demo_three_list
 39     temporary_yaml = yaml_path + write_yaml("/temporary.yaml", temporary_list)
 40 
 41     # 把几个yaml文件合并为一个临时yaml文件
 42 
 43     @classmethod
 44     def setUpClass(cls) -> None:
 45         cls.variable_result_dict = {}
 46         # 定义一个变量名与提取的结果字典
 47         # cls.variable_result_dict与self.variable_result_dict都是本类的公共属性
 48 
 49     @ddt.file_data(yaml_path + "/temporary.yaml")
 50     # 传入临时yaml文件
 51     def test_demo(self, **kwargs):
 52         """
 53         测试用例
 54         :param kwargs:
 55         :return:
 56         """
 57 
 58         kwargs = str(kwargs)
 59         if "None" in kwargs:
 60             kwargs = kwargs.replace("None", "''")
 61         kwargs = demjson.decode(kwargs)
 62         # 把值为None的替换成''空字符串,由于None没法拼接
 63         # demjson.decode()等价于json.loads()反序列化
 64 
 65         case_name = kwargs.get("case_name")
 66         # 用例名称
 67         self._testMethodDoc = case_name
 68         # 测试报告里面的用例描述
 69         mysql = kwargs.get("mysql")
 70         # mysql查询语句
 71         request_mode = kwargs.get("request_mode")
 72         # 请求方式
 73         api = kwargs.get("api")
 74         # 接口
 75         if type(api) != str:
 76             api = str(api)
 77         payload = kwargs.get("data")
 78         # 请求体
 79         if type(payload) != str:
 80             payload = str(payload)
 81         headers = kwargs.get("headers")
 82         # 请求头
 83         if type(headers) != str:
 84             headers = str(headers)
 85         query_string = kwargs.get("query_string")
 86         # 请求参数
 87         if type(query_string) != str:
 88             query_string = str(query_string)
 89         expected_code = kwargs.get("expected_code")
 90         # 预期的响应代码
 91         expected_result = kwargs.get("expected_result")
 92         # 预期的响应结果
 93         regular = kwargs.get("regular")
 94         # 正则
 95 
 96         logger.info("{}>>>开始执行", case_name)
 97 
 98         if environment == "prd" and mysql != "":
 99             self.skipTest("生产环境跳过此用例,请忽略")
100         # 生产环境不能链接MySQL数据库,所以跳过,此行后面的都不会执行
101 
102         requests_list = [api, payload, headers, query_string]
103         # 请求数据列表
104 
105         for index, value in enumerate(requests_list):
106             # for循环修改requests_list的值
107 
108             if self.variable_result_dict:
109                 # 若是变量名与提取的结果字典不为空
110                 if "$" in value:
111                     for key, value_2 in self.variable_result_dict.items():
112                         value = value.replace("{" + key + "}", value_2)
113                         # replace(old, new)把字符串中的旧字符串替换成正则表达式提取的值
114                     value = re.sub("\\$", "", value)
115                     # re.sub(old, new, 源字符串)默认所有替换
116                     # 若是遇到带有转义的字符被看成特殊字符时,使用双反斜杠\\来转义,或者在引号前面加r
117             else:
118                 pass
119 
120             if mysql:
121                 # 若是mysql查询语句不为空
122                 if "$" in mysql:
123                     # 有些场景下MySQL查询语句也须要参数化
124                     for key, value_2 in self.variable_result_dict.items():
125                         mysql = mysql.replace("{" + key + "}", value_2)
126                     mysql = re.sub("\\$", "", mysql)
127                 mysql_result_tuple = query_mysql(mysql)
128                 # mysql查询结果元祖
129                 mysql_result_list = list(chain.from_iterable(mysql_result_tuple))
130                 # 把二维元祖转换为一维列表
131                 if "__SQL" in value:
132                     for i in mysql_result_list:
133                         if type(i) != str:
134                             i = str(i)
135                         value = value.replace("{__SQL}", i, 1)
136                         # replace(old, new, 替换次数)把字符串中的{__SQL}替换成mysql查询返回的值
137             else:
138                 pass
139 
140             if "__RN" in value:
141                 digit_list = re.findall("{__RN(.+?)}", value)
142                 # 获取位数列表
143                 for j in digit_list:
144                     random_number = create_random_number(int(j))
145                     # 调用生成随机数字的方法
146                     value = value.replace("{__RN" + j + "}", random_number)
147 
148             if "__RL" in value:
149                 digit_list = re.findall("{__RL(.+?)}", value)
150                 # 获取位数列表
151                 for i in digit_list:
152                     random_letters = create_random_letters(int(i))
153                     # 调用生成随机字母的方法
154                     value = value.replace("{__RL" + i + "}", random_letters)
155 
156             requests_list[index] = value
157 
158         api = requests_list[0]
159         payload = requests_list[1]
160         headers = requests_list[2]
161         query_string = requests_list[3]
162 
163         if payload != "":
164             payload = demjson.decode(payload)
165         if headers != "":
166             headers = demjson.decode(headers)
167         if query_string != "":
168             query_string = demjson.decode(query_string)
169 
170         url = service_domain + api
171         # 拼接完整地址
172 
173         logger.info("请求方式为:{}", request_mode)
174         logger.info("地址为:{}", url)
175         logger.info("请求体为:{}", payload)
176         logger.info("请求头为:{}", headers)
177         logger.info("请求参数为:{}", query_string)
178 
179         logger.info("预期的响应代码为:{}", expected_code)
180         logger.info("预期的响应结果为:{}", expected_result)
181 
182         response = requests.request(
183             request_mode, url, data=json.dumps(payload),
184             headers=headers, params=query_string, timeout=(9, 15))
185         # 发起HTTP请求
186         # json.dumps()序列化把字典转换成字符串,json.loads()反序列化把字符串转换成字典
187         # data请求体为字符串,headers请求头与params请求参数为字典
188 
189         actual_time = response.elapsed.total_seconds()
190         # 实际的响应时间
191         actual_code = response.status_code
192         # 实际的响应代码
193         actual_result_text = response.text
194         # 实际的响应结果(文本格式)
195 
196         logger.info("实际的响应代码为:{}", actual_code)
197         logger.info("实际的响应结果为:{}", actual_result_text)
198         logger.info("实际的响应时间为:{}", actual_time)
199 
200         if regular:
201             # 若是正则不为空
202             extract_list = []
203             # 定义一个提取结果列表
204             for i in regular["expression"]:
205                 regular_result = re.findall(i, actual_result_text)[0]
206                 # re.findall(正则表达式, 实际的响应结果)返回一个符合规则的list,取第1个
207                 extract_list.append(regular_result)
208                 # 把提取结果添加到提取结果列表里面
209 
210             temporary_dict = dict(zip(regular["variable"], extract_list))
211             # 把变量列表与提取结果列表转为一个临时字典
212 
213             for key, value in temporary_dict.items():
214                 self.variable_result_dict[key] = value
215             # 把临时字典合并到变量名与提取的结果字典,已去重
216         else:
217             pass
218 
219         for key in list(self.variable_result_dict.keys()):
220             if not self.variable_result_dict[key]:
221                 del self.variable_result_dict[key]
222         # 删除变量名与提取的结果字典中为空的键值对
223 
224         actual_result_text = re.sub("{|}|\"|\\[|\\]", "", actual_result_text)
225         # 去除{、}、"、[与]
226         actual_result_list = re.split(":|,", actual_result_text)
227         # 把响应文本转为列表,并去除:与,
228 
229         if expected_code == actual_code:
230             if set(expected_result) <= set(actual_result_list):
231                 # 预期的响应结果与实际的响应结果是被包含关系
232                 # 判断是不是其真子集
233                 logger.info("{}>>>执行经过", case_name)
234             else:
235                 logger.error("{}>>>执行失败", case_name)
236             self.assertTrue(set(expected_result) <= set(actual_result_list))
237             # 布尔表达式断言
238         else:
239             logger.error("{}>>>请求失败,请检查域名、路径与请求参数是否正确!", url)
240             logger.error("{}>>>执行失败", case_name)
241             self.assertTrue(set(expected_result) <= set(actual_result_list))
242 
243         logger.info("##########用例分隔符##########\n")
244         # sleep(3)
245         # 等待时间为3秒,也能够调整为其余值
246 
247 
248 if __name__ == '__main__':
249     beautiful_report_run(DemoTest)
250     # 调用BeautifulReport运行方式

project_config.py文件:

 1 """
 2 整个工程的配置文件
 3 """
 4 
 5 import os
 6 import sys
 7 import time
 8 
 9 from loguru import logger
10 
11 parameter = sys.argv[1]
12 # 从命令行获取参数
13 
14 environment = os.getenv("measured_environment", parameter)
15 # 环境变量
16 
17 if environment == "dev":
18     service_domain = "http://www.dev.com"
19     # 开发环境
20     db_host = 'mysql.dev.com'
21     db_port = 3306
22 elif environment == "test":
23     service_domain = "http://www.test.com"
24     # 测试环境
25     db_host = 'mysql.test.com'
26     db_port = 3307
27 elif environment == "pre":
28     service_domain = "http://www.pre.com"
29     # 预生产环境
30     db_host = 'mysql.pre.com'
31     db_port = 3308
32 elif environment == "formal":
33     service_domain = "https://www.formal.com"
34     # 生产环境
35     db_host = None
36     db_port = None
37 
38 db_user = 'root'
39 db_password = '123456'
40 db_database = ''
41 # MySQL数据库配置
42 
43 
44 current_path = os.path.dirname(os.path.dirname(__file__))
45 # 获取当前目录的父目录的绝对路径
46 # 也就是整个工程的根目录
47 case_path = os.path.join(current_path, "case")
48 # 测试用例的目录
49 yaml_path = os.path.join(current_path, "resource")
50 # yaml文件的目录
51 today = time.strftime("%Y-%m-%d", time.localtime())
52 # 年月日
53 
54 report_path = os.path.join(current_path, "report")
55 # 测试报告的目录
56 if os.path.exists(report_path):
57     pass
58 else:
59     os.mkdir(report_path, mode=0o777)
60 
61 log_path = os.path.join(current_path, "log")
62 # 日志的目录
63 if os.path.exists(log_path):
64     pass
65 else:
66     os.mkdir(log_path, mode=0o777)
67 
68 logging_file = os.path.join(log_path, "log{}.log".format(today))
69 
70 logger.add(
71     logging_file,
72     format="{time:YYYY-MM-DD HH:mm:ss}|{level}|{message}",
73     level="INFO",
74     rotation="500 MB",
75     encoding="utf-8",
76 )
77 # loguru日志配置

四、运行:

unittest模式:

python+测试文件名+环境缩写

python ./case/demo_test.py dev

python ./case/demo_test.py test

python ./case/demo_test.py pre

python ./case/demo_test.py formal

pytest模式:

pytest -v

完整命令为:

pytest -v --reruns 3 --reruns-delay 3 --timeout=60 --junitxml=./report/report.xml --html=./report/report.html --self-contained-html --alluredir=./report/allure-report

已写进pytest.ini配置文件

相关文章
相关标签/搜索