如何对消息队列作性能测试

本人在负责服务压测的实践中,遇到了一个需求,就是对消息队列的dubbo接口性能进行压测,主要分两类:一类是往队列里面添加,一类是从队列中取值(等同删除)。是一个server的两个不一样方法。同组其余人用的jmeter进行的dubbo接口压测。 队列的添加规则比较简单,主要有一个标志msg,由事件类型+用户标识符+消息体构成。作此类此类测试的时候遇到的问题就是若是构建消息体,每次都构建不一样的消息体,这里我才用了纳秒+随机数的方式,后来发现直接用纳秒就行。(这里相信jmeter也应该有响应的方法)java

在添加队列的测试不太清楚jmeter如何实现,由于他们直接放弃掉了,我才用的方案是,先构建足够多数量的消息,而后将消息数据拿出来放到一个线程安全的集合中,多线程去拿,使用的是java的LinkedBlockingQueue<String>消息队列。没作完一次测试,重置一次测试数据,防止中途有失败的状况。git

public int createQ() {
        String absolutePath = new File("").getAbsolutePath();
        List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/dubbo");
        new Concurrent(new ThreadBase(SourceCode.changeStringToInt(strings.get(0))) {
            @Override
            protected void before() {

            }

            @Override
            protected void doing() throws Exception {
                CreateQueueRequest createQueueRequest = new CreateQueueRequest();
                createQueueRequest.setReqId(TraceKeyHolder.getTraceKey());
                createQueueRequest.setDelayTime(System.currentTimeMillis() + 3600 * 1000);
                String msg = "wait_for_publish:8888" + "@" + System.nanoTime() + PublishType.ZUOYE;
                createQueueRequest.setMsg(msg);
                createQueueRequest.setTaskTypeEnum(TaskTypeEnum.PUBLISH_PROMU);
                createQueueRequest.setTtl(0L);
                CommonResponse<CreateQueueResultVo> queue = commonDelayQueueService.createQueue(createQueueRequest);
                logger.info("createQueue0  {}", JsonUtil.obj2Json(queue));
            }

            @Override
            protected void after() {

            }
        }, SourceCode.changeStringToInt(strings.get(1))).start();
        return 0;
    }

删除队列:api

public int deleteQ() throws InterruptedException {
        if (msgs.size() == 0) {
            logger.info("队列为空了");
            msgs = addmsg();
        }
        String absolutePath = new File("").getAbsolutePath();
        List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/dubbo");

        new Concurrent(new ThreadBase(SourceCode.changeStringToInt(strings.get(0))) {
            @Override
            protected void before() {

            }

            @Override
            protected void doing() throws Exception {
                String msg = msgs.poll(100, TimeUnit.MILLISECONDS);
                logger.info("msg:{}", msg);
                DeleteQueueRequest deleteQueueRequest0 = new DeleteQueueRequest();
                deleteQueueRequest0.setMsg(msg);
                deleteQueueRequest0.setTaskTypeEnum(TaskTypeEnum.PUBLISH_PROMU);
                CommonResponse<String> queue3 = commonDelayQueueService.deleteQueue(deleteQueueRequest0);
                logger.info("deleteQueue2 {}", JsonUtil.obj2Json(queue3));
            }

            @Override
            protected void after() {

            }
        }, SourceCode.changeStringToInt(strings.get(1))).start();

        return 0;
    }

其中msgs的设置以下:安全

public static LinkedBlockingQueue<String> msgs = addmsg();


    public static LinkedBlockingQueue<String> addmsg() {
        String absolutePath = new File("").getAbsolutePath();
        List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/queue");
        LinkedBlockingQueue<String> ss = new LinkedBlockingQueue<>();
        ss.addAll(strings);
        logger.info("从新读取队列值");
        return ss;
    }
  • 这里会有一个问题:在不断测试过程当中,addmsg方法可能在测试过程当中被执行。

由于我在作测试的时候,数据量足够大,因此没有作处理,若是数据量不足以支撑不少次测试,能够采用启动测试前把msgs进行初始化,或者在before()方法里面为每个线程进行数据初始化操做。markdown

欢迎有兴趣的童鞋一块儿交流多线程

相关文章
相关标签/搜索