其中要特别注意第二条,订阅关系一致性问题,若是3个消费者在一个组内,订阅的tag不一致,消费是有问题的,也可能就不消费。windows
TPS不到100,这个基于本地到阿里云,走的公网,效果通常。服务器
发送和消费一万条数据大概都是22秒左右,TPS大概500。markdown
public static void sendMqMessage( String topic, String tag, String message, String sharding ) { String key = UUID.randomUUID().toString(); Message msg = new Message( // Message 所属的 Topic topic, // Message Tag, 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤 tag, // Message Body 能够是任何二进制形式的数据, 消息队列 RocketMQ 不作任何干预,须要 Producer 与 Consumer 协商好一致的序列化和反序列化方式 message.getBytes() ); // 设置表明消息的业务关键属性,请尽量全局惟一。 // 以方便您在没法正常收到消息状况下,可经过控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发 msg.setKey(key); // 分区顺序消息中区分不一样分区的关键字段,sharding key 于普通消息的 key 是彻底不一样的概念。 // 全局顺序消息,该字段能够设置为任意非空字符串。 String shardingKey = sharding; try { SendResult sendResult = producer.send(msg, shardingKey); // 发送消息,只要不抛异常就是成功 if (sendResult != null) { //System.out.println(message + tag + sharding); if(message.equals("10000")){ System.out.println(tag + ":发送完毕10000!"); } //SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //System.out.println(dateFormat.format(new Date()) + "-发送消息成功-sharding:" + shardingKey + ",tag:" + tag + ",key:"+ key + ",msgId:" + sendResult.getMessageId()); } } catch (Exception e) { // 消息发送失败,须要进行重试处理,可从新发送这条消息或持久化这条数据进行补偿处理 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); throw e; } }
public void consumerMqMessage() { String topic = "topic-test"; String tags = "W3";//第一种模式使用* consumer = RocketMqConsumerSingleton.getInstance(); // 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次便可。 consumer.subscribe( // Message 所属的 Topic topic, // 订阅指定 Topic 下的 Tags: // 1. * 表示订阅全部消息 // 2. TagA || TagB || TagC 表示订阅 TagA 或 TagB 或 TagC 的消息 tags, new MessageOrderListener() { /** * 1. 消息消费处理失败或者处理出现异常,返回 OrderAction.Suspend<br> * 2. 消息处理成功,返回 OrderAction.Success */ @Override public OrderAction consume(Message message, ConsumeOrderContext context) { String msg = new String(message.getBody()); //System.out.println(msg); if(msg.equals("1")) { System.out.println(message.getTag() + "开始:" + System.currentTimeMillis()); } if(msg.equals("10000")) { System.out.println(message.getTag() + "结束:" + System.currentTimeMillis()); } //SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //System.out.println(dateFormat.format(new Date()) + "-消费消息---sharding:" + message.getShardingKey() + ",tag:" + message.getTag() + ",key: " + message.getKey() + ",MsgId:" + message.getMsgID()); try { //Thread.sleep(2000); //System.out.println("-------------消费者睡2秒后----------"); } catch (Exception e) { e.printStackTrace(); } return OrderAction.Success; } }); consumer.start(); }