在使用rocketmq以前使用了rabbitmq,会出现丢消息的状况,进而果断放弃,继续投入到大Java的怀抱,不过也遇到了一些问题,这里总结一下:java
使用过程当中新加节点须要手动建立topic多线程
消费者处理不合理,不能实现负载均衡负载均衡
针对第二点:我以前一直使用的是pull方式,按顺序来消费,一旦程序重启则从头一个一个消费,显然这种效率很低,
并且由于一个代码问题,若是我在offsize = 0的状况获取不了数据,则min offsize不增加,这种状况致使若是数据隔天了则不能消费,这时候消费者至关于在空跑。fetch
List<MessageVo> msgList = new ArrayList<MessageVo>(); try { Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: " + mq + "%n"); try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (pullResult != null) { putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody())); } } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; default: break; } } } catch (Exception e) { e.printStackTrace(); }
使用这种方法能够稳定的pull出数据,可是这种状况效率很低。url
使用多线程方式:线程
private Queue<List<MessageVo>> messageQueue = new LinkedBlockingQueue<List<MessageVo>>(); DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(groupName); consumer.setNamesrvAddr(url); scheduleService = new MQPullConsumerScheduleService(groupName); scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.setDefaultMQPullConsumer(consumer); List<MessageVo> msgList = new ArrayList<MessageVo>(); try { Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: " + mq + "%n"); try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (pullResult != null) { putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody())); } } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; default: break; } } } catch (Exception e) { e.printStackTrace(); } List<MessageVo> msgList = new ArrayList<MessageVo>(); try { Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: " + mq + "%n"); try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (pullResult != null) { putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody())); } } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; default: break; } } } catch (Exception e) { e.printStackTrace(); }
经过回调的方式来定时调用。这种方式是多线程来实现的。code
使用这种方式不能部署两个程序,由于会致使groupName冲突 rabbitmq