运行方式:
python ***.py
结果可保存到result_code_creattxt文件中。
Locust压测框架
如何使用
注:@Task是任务注解,其中1表示这个接口的访问流量的权重,header是数据格式,data是数据,/acCheck/acCheckDetails是接口。
运行结果
在服务器中项目会提供很多的接口,因此会有多个task:
@task(23)
def deviceinfo_deleteHeadImg(self):
header = {'Content-Type': 'application/json'}
data = '{'userId':123,'appId':123123, 'deviceId':'111'}'
self.client.post('/deviceinfo/deleteHeadImg', data=data, headers=header)
@task(5)
def deviceinfo_get(self):
header = {'Content-Type': 'application/json'}
data = '{'userId':123,'appId':123123}'
self.client.post('/deviceinfo/get', data=data, headers=header)
@task(1)
def region_get(self):
header = {'Content-Type': 'application/json'}
data = '{'appId':'123','userEmail':'123456', 'regionCode':'84001002'}'
self.client.post('/deviceinfo/oversea/app/region/get', data=data, headers=header)
这篇文章会利用locust这个包对服务器的接口进行压力测试,你可以学到:
如何利用Python进行压力测试如何对不同的接口分配流量权重如何自动生成测试代码
总之locust是非常好用、简单的测试工具。
本地可以采用web的方式,可在网页端输入总的并发数量和每秒递增的线程数量,在服务器上运行采用no-web的方式,no-web需要跟随c、r、t等参数。
在升级的过程中怎样验证服务器的升级效果呢?
如何运行
if __name__ == '__main__':
os.system(
'locust -f pressure_start.py --no-web -c 20 -r 20 -t 100s --csv=example --loglevel=INFO --logfile=test.log --host=http://localhost:18813')
新建类-PressureStart
class PressureStart(HttpUser):
min_wait = 100 # 最小等待时间(ms),模拟用户在执行每个任务之间等待的最小时间
max_wait = 500 # 最大等待时长(ms),模拟用户在执行每个任务之间等待的最大时长
host = 'http://localhost:18813' # 访问的域名和端口
def on_start(self):
# 开始任务
print('start working')
一个项目不免有好几十个接口,怎样自动生产测试task呢?
import pandas as pd
def get_uri_data():
uri_data_values = pd.read_excel('./pressure_uri.xlsx', usecols=[0, 1], names=None, sheet_name='Sheet1')
uri_data_list = uri_data_values.values.tolist()
result = []
uri_len = len(uri_data_list)
for i in range(uri_len):
result.append('@task(1)
')
str1 = str(uri_data_list[i][0])
chs = str1.split('/')
title = chs[len(chs) - 2] + '_' + chs[len(chs) - 1]
result.append('def ' + title + '(self):
')
result.append('header={'Content-Type': 'application/json'}
')
data = ''' + uri_data_list[i][1].replace(u'xa0', '') + '''
result.append('data=' + data + '
')
result.append('self.client.post(' + ''' + str1 + ''' + ', data=data, headers=header)')
result.append('
')
print(result)
# result.clear()
with open('./result_code_create.txt', 'w', encoding='utf-8') as f:
f.writelines(result)
我们都知道在服务器提供服务的过程中,不免因为流量的增加导致服务器的提供服务性能下降,因此需要对服务器进行升级,通过有针对性的测试,对延迟非常严重的接口进行升级,比如增加缓存、增大线程池或者重写服务。
如何自动化生产task代码
注:
-c | 总的并发数量 |
-r | 指定并发加压速率(每秒多少个用户) |
-t | 测试时间,30s 、1m 、1h |
--csv:保存运行结果
--loglevel:日志等级
--logfile:日志文件
--host:域名
新建任务@Task
@task(1)
def acCheck_acCheckDetails(self):
header = {'Content-Type': 'application/json'}
data = '{'applianceId': 17592196255061}'
self.client.post('/acCheck/acCheckDetails', data=data, headers=header)
导入包
文章为作者独立观点,不代表股票交易接口观点