Locust是一个基于Python的带有可视化图形界面的测试工具。前端
本人不是专业的测试人员,Python也是先学先用的,因此不会涉及到太多的相关专业知识。本文主要是分享本身学习使用Locust的收获,基于官方文档和他人的博客,结合本身的使用体验,因此不是一篇教学或者专业文。你在这篇文章中看不到Locust的嵌套task和多locustfile测试等。python
locust:本意是蝗虫,在测试过程当中当作是一个用户便可(后面所谓的HttpLocust只是带有特殊性质的用户)。
hatch rate:孵化率,表示每秒钟产生的用户数量,能够模拟一个逐渐增长的过程,能够根据曲线来考察接口的处理峰值。
TaskSet:顾名思义,是任务的集合,表明着每一个用户具备的行为,也就是你想要让用户对整个后台功能作什么。
max_wait/min_wait:最大等待时间/最小等待时间,这是用户的休息时间,用户在执行任务的过程当中,在这两个值肯定的区间内随机选择一个时间进行休息,模拟真实用户的缓冲,同时也能够保证某些带有评率限制的接口可以安全调用,不然可能会出现不少错误甚至被封IP等。
locustfile.py:文件名称固然能够随便定义,可是这个文件的功能,至关于创建了一个用户的模型,须要产生的大量用户都将以这个文件定义为原型。web
locust以这个脚本做为用户的原型孵化用户,这个脚本包括两个部分,TaskSet子类和Locust子类。TaskSet子类是用于描述用户行为的,提供on_start方法执行前置操做。Locust子类主要是使用HttpLocust子类,由于如今的测试主要是针对http的接口,固然也能够经过一些特别的模块实现websocket的测试(虽然感受websocket的测试也没有什么必要)json
这个类主要定义用户的目标(host),至关于一个baseURL,用户的行为(请求)都会相对于这个目标进行。定义用户的休息间隔(max_wait/min_wait)。定义用户的行为(TaskSet)。一个普通的HttpLocust多是如下这样的:浏览器
class WebsiteUser(HttpLocust):
host = "http://juejin.im"
max_wait = 3000
min_wait = 3000
task_set = UserAction
复制代码
你还能够在这个类里面经过类的静态变量来提供一个全部用户都能使用的变量。好比一个queue,一个list等。因为类的静态变量的特性,每一个用户修改这个值能够实现一个递增的index之类的需求,参数化的逻辑能够借助这一点。安全
这个类负责定义每一个用户的实际动做,一个用户可能有不少动做,注册啦,买东西啦,撤销啦等等。on_start方法是给每一个用户开始这些动做以前的一个前置准备,能够在这个方法中初始化一些东西,好比登陆或者连接websocket。服务器
TaskSet中能够经过self.client来使用HttpSession class,也就是用户的http请求能力的供应者,能够理解成用户的浏览器,用来发送请求和接受响应的。self.client是自带cookie和session的能力的,彻底至关于一个浏览器环境。也就是说self.client发送了一个请求后,好比登陆请求,成功后再用这个self.client发送别的请求,会带上登陆请求成功后服务器返回的cookie。websocket
TaskSet还提供了一种神奇的东西ResponseContextManager class,当self.client发送的请求带有catch_response=True的参数,像下面这样:cookie
with self.client.get("/", catch_response=True) as response:
if response.content == "":
response.failure("No data")
# response.success()
复制代码
通常的状况下使用这样的便可:session
response = self.client.get("/",data=data)
复制代码
这种普通的请求在响应状态码小于400的状况下都认为成功,然而如今大部分接口都是经过返回的数据来表达你的动做O不OK,这就须要ResponseContextManager经过success和failure来手动修正正确与错误的显示。
TaskSet中self.locust能够取到HttpLocust的实例,也就是说你在HttpLocust中的某些变量,你是可使用的。固然如何使用仍是取决于你的测试逻辑。
有了以上这些东西,基本就能够开始定义本身的动做了。首先你须要引入locust提供的task装饰器,TaskSet子类中的实例方法若是被task装饰,那么就会认为是一个须要执行的动做。@task(weight)是支持权重安排的,也就是说多项任务能够按照必定的频繁度来调用,更接近用户的某些实际体验,好比购物过程当中翻页可能要比添加购物车频繁。
一个普通的TaskSet像是这样:
class ForumPage(TaskSet):
@task(100)
def read_thread(self):
pass
@task(7)
def create_thread(self):
pass
复制代码
虽然两个任务什么都没干,可是执行的频率能够看出来谁更频繁。没有特殊的设置下,两个任务按照权重随机的顺序执行,也就是有可能在短时间内没法按照实际的权重执行,也就是说每一个任务之间最好不要有顺序以来(可是能够经过一些操做来保证顺序)。
一个普通的task像是这样:
@task
def test_login(self):
# 这里经过类的静态变量来控制一个全局的变量,保证惟一的index
# WebsiteUser这就是Locust子类
WebsiteUser.index += 1
data = {
# 这里组织你的发送数据
# 经过self.locust来访问所属的Locust子类
"uid": self.locust.index
}
with self.client.post("/loginl", data=data,
catch_response=True) as response:
# response提供了一些快捷的方法和属性,让你判断请求的状况
if response.ok:
# 若是响应的content是json格式的,能够帮你转成字典
result = response.json()
# 经过返回的数据来手动判断是否成功
if result['code'] == 200:
response.success()
else:
response.failure(result['msg'])
else:
response.failure(bytes.decode(response.content))
复制代码
有了locustfile.py就能够开始跑起来了,有可视化界面仍是阔以的。命令行跑起来我就很少说了,浏览器打开localhost:8089,会让你输入总用户量和孵化率。加入你输入200和1,就表明每一秒产生一个用户,直到200个用户都产生。注意:到达总量时统计数据会重置。
因为用户的行为是尽量模拟实际状况,因此考察数据的时候应当清楚一些。200个用户,假设有两个task,比重1:1,min_wait和max_wait都是1000ms,那么理想状况下某一时刻,200个用户都发送了一个请求,且这个请求的响应时间能够忽略,那么咱们能够认为一秒的并发是200,等待时间是1s,故并发应当稳定在200。因此你须要考虑响应时间和等待时间,选择合适的用户数有可能才到达所须要的并发量。
咱们在测试一个网站的功能时,咱们经过locust模拟的用户须要不一样的帐号来进行操做,所以咱们须要在TaskSet中作一些改造。以前咱们说过了TaskSet拥有on_start方法执行前置动做,且self.client能够保持cookie和session,这不恰好能够知足登录后执行操做的需求吗。那么下面一个问题就是若是使用不一样的帐号,这使用Python也是比较容易实现的。咱们在WebsiteUser中定义一个类静态变量,好比下面这样:
class WebsiteUser(HttpLocust):
host = "http://juejin.im"
max_wait = 3000
min_wait = 3000
task_set = UserAction
# 这个属性在locust启动时被初始化好
# 使用WebsiteUser.user_index来非竞争的更新,实现一个不重复的递增序列
user_index = 0
复制代码
使用递增序列的方式,能够快速注册一大批用户,只须要肯定好用户名和密码若是使用这个index进行产生便可。固然,若是不想使用这样的方式,还有一种就是使用数据结构,好比queue:
class WebsiteUser(HttpLocust):
host = 'http://juejin.im'
task_set = UserBehavior
user_data_queue = queue.Queue()
# 一共300w用户准备
for index in range(1500000, 4500000):
data = {
# 准备你的注册或者登录数据
}
user_data_queue.put_nowait(data)
min_wait = 500
max_wait = 500
复制代码
@task
def test_register(self):
try:
# 用queue来使每一个用户取得不一样的帐户信息
data = self.locust.user_data_queue.get()
except queue.Empty:
# 固然了,只出队列会致使数据跑空
# 若是须要循环,用完的数据从队列的另外一头塞回去就能够重复利用了
print('account data run out, test ended.')
exit(0)
with self.client.post('/register', data=data,
catch_response=True) as response:
result = response.json()
if result['code'] == 200:
response.success()
else:
response.failure(result['msg'])
复制代码
参数化的一些高级用法能够参见深刻浅出开源性能测试工具Locust(脚本加强)会给你很多启发。
少数状况咱们想考量websocket的性能,好比同时链接的客户端以及推送到接受的延迟状况。Python强大的模块库确定是有websocket相关的,经过搜索和尝试,pip2安装的websocket模块和pip3的好像不同,使用pip3安装的实践成功。
通常来讲咱们在on_start方法中初始化链接:
self.ws = websocket.WebSocket()
self.ws.connect("ws://xxx")
复制代码
而且将ws实例做为实例属性方便后续的操做。注意:websocket链接端在测试过程当中断掉也是极有可能的,因此提供一个重连的方法可能会有必要。
有了ws的链接以后呢,通常须要发送订阅信息让服务器来主动推送数据,若是ws在接受了订阅后会先返回一条订阅的结果,若是在可视化界面上经过数据统计出来,这就须要locust提供的hook。
from locust import TaskSet, task, HttpLocust, events
将events导入进来,而后触发一些事件,皆可让locust统计相应的数据:
events.request_success.fire(
request_type="wss",
name="send_entrust",
response_time=100,
response_length=300
)
复制代码
request_type至关于请求的类型,能够随便填。name至关于请求url,response_time至关于响应事件,response_length至关于响应体大小。实际上self.client.post()等也会使用事件触发来统计数据。好比ResponseContextManager中success的实现:
def success(self):
""" Report the response as successful Example:: with self.client.get("/does/not/exist", catch_response=True) as response: if response.status_code == 404: response.success() """
events.request_success.fire(
request_type=self.locust_request_meta["method"],
name=self.locust_request_meta["name"],
response_time=self.locust_request_meta["response_time"],
response_length=self.locust_request_meta["content_size"],
)
self._is_reported = True
复制代码
这样的话,设计一个task来发送订阅并接受推送。做为前端人员来讲,websocket都是用回调来处理推送,那么Python里面的websocket怎么办呢?实践成功的websocket模块链接后,也就是从connect()以后,self.ws自己是一个可迭代的,虽然每次next都是调用resv()方法,在循环里就好使。而self.ws.resv()这个函数是很神奇的,调用开始后他会等到接受推送了再返回,宏观角度来讲这个函数的执行时间,应当等于服务器推送的间隔,一次能够判断推送是否出现了延迟。且这个函数若是返回的是空字符串,大体能够认为链接断掉,逻辑上增长统计和重连,可以保证一些测试准确度。 给出一个我实际使用的task:
@task
def test_ws(self):
# 这里可使用这一个task来完成发送订阅和处理订阅
if not self.send_flag:
self.ws.send("""{ # 这里组织你须要发送的订阅数据 } """)
result = json.loads(self.ws.recv())
# 这里来处理订阅的结果,若是订阅成功改变标志位,让这个task只执行接受
if result['code'] == 200:
self.send_flag = True
events.request_success.fire(
request_type="wss",
name="send_entrust",
response_time=100,
response_length=300)
else:
events.request_failure.fire(
request_type="wss",
name="send_entrust",
response_time=100,
exception=Exception(self.ws.status),
)
else:
flag = True
# 因为推送是不间断的直到发送取消订阅,故使用死循环来不断统计
while flag:
# self.ws自己是个迭代器,next和调用resv()是同样的,均可以
start_time = time.time()
resv = next(self.ws)
# 推送间隔就能够当作统计数据进行显示
total_time = int((time.time() - start_time) * 1000)
if resv != '':
events.request_success.fire(
request_type="wss",
name="resv_entrust",
response_time=total_time,
response_length=59)
else:
# 若是中断了能够增长重连方案
events.request_failure.fire(
request_type="wss",
name="resv_entrust",
response_time=total_time,
exception=Exception(""),
)
flag = False
else:
print("no resv")
复制代码
Locust整体来讲仍是比较容易上手的,提供的功能大概可以知足通常的测试需求。实践的过程当中遇到过一些些问题,好比请求的数量不必定在页面上统计的及时,用户数也有可能会断掉一两个,具体缘由不明。本人也不是专门的测试人员, 也不是Python native speaker,因此对于Python的理解和对测试分析的观点不必定对,只是分享一下我在使用的过程当中如何思考问题如何解决问题,解决方案不必定最优,也有可能会形成不少困扰,但这是个人学习过程,想和你们分享,愿意和你们讨论并继续学习。