最近运维跟我反馈我负责的应用服务线上监控到消费RabbitMQ消息队列过慢,目前只有20左右,监控平台会有消息积压的告警。html
开发修改了一版应用服务的版本,提交给我作压测验证。web
以前没有作过消息中间件的压测,网上找了一圈测试方法,而且和开发沟通,最终确认经过压测RabbitMQ event消息处理的接口来完成本次的压测验证。并发
压测脚本:运维
import pika import multiprocessing as mp import time def main(counter): routing_key = "busi.mc.event.XXXX" # 被压测的应用服务的key,指定消息的消费者 credentials = pika.PlainCredentials('guest', 'guest') parameters = pika.ConnectionParameters('XXX.XX.XXX.XX', 5672, '/', credentials) connection = pika.BlockingConnection(parameters) # 链接 RabbitMQ channel = connection.channel() # 建立频道 for i in range(1, counter): # 循环生产信息,供消费者(被压测的应用服务)消费 channel.basic_publish(exchange='mc-direct-exchange', routing_key=routing_key, body='{"clientId":"5e8J8aoi4F380gpDS4sdfd","eventType":1}', properties=pika.BasicProperties( content_type="text/plain", delivery_mode=1)) time.sleep(0.1) # if counter % 600 == 0: # time.sleep(1) connection.close() # 关闭链接 def loop_test(counter): for i in range(1, counter): main() if counter % 100 == 0: time.sleep(1) # 单个频率 if __name__ == "__main__": # Define an output queue output = mp.Queue() # Setup a list of processes that we want to run processes = [mp.Process(target=main, args=(100000,)) for x in range(20)] # 消息总条数 并发数 # Run processes for p in processes: p.start() # Exit the completed processes for p in processes: p.join() # Get process results from the output queue # results = [output.get() for p in processes] # print(results)
脚本运行后,经过RabbitMQ的web管理后台,查看消费消息的TPS已经能够稳定在200左右,本次验证经过了~~oop