普通消息 顺序消息 分布式事务消息数据库
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQbroker-b " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg);
启动2个同样的消费者,普通消息会被分配到2个消费者上面去消费分布式
String[] tags = new String[]{"createTag","payTag","sendTag"}; for(int order=0;order<5;order++){ for(int type=0;type<3;type++){ Message msg = new Message("TopicTest",tags[type % tags.length],order+":" +type,(order+":" +type).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer)o; int index = id % list.size(); return list.get(index); } },order); System.out.printf("%s%n", sendResult); } }
接收到的消息: 4:0 ConsumeMessageThread_1 接收到的消息: 4:0 [MessageExt [queueId=0, storeSize=179, queueOffset=1867, sysFlag=0, b 接收到的消息: 4:1 ConsumeMessageThread_1 接收到的消息: 4:1 接收到的消息: 4:2 ConsumeMessageThread_1 接收到的消息: 4:2ide
TransactionCheckListener checkListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroup"); producer.setNamesrvAddr("10.10.10.108:9876;10.10.10.190:9876"); producer.setTransactionCheckListener(checkListener); producer.start(); TransactionExecuterImpl transactionExecuter = new TransactionExecuterImpl(); try{ Message msg1 = new Message("TopicTest", "TagA" ,"key1",("Hello RocketMQ1 " ).getBytes(RemotingHelper.DEFAULT_CHARSET) ); Message msg2 = new Message("TopicTest", "TagA" ,"key2",("Hello RocketMQ1 " ).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult1 = producer.sendMessageInTransaction(msg1,transactionExecuter,null); logger.info("{} mesg1: {}",new Date(),sendResult1); sendResult1 = producer.sendMessageInTransaction(msg2,transactionExecuter,null); logger.info("{} mesg2: {}",new Date(),sendResult1); }catch(Exception e){ e.printStackTrace(); } producer.shutdown(); public class TransactionExecuterImpl implements LocalTransactionExecuter{ private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override //事务消息是否对消费者可见,彻底由事务返回给RMQ的状态码决定(状态码的本质也是一条消息)。 public LocalTransactionState executeLocalTransactionBranch(Message message, Object o) { try{ logger.info("{} 本地事务执行成功",new Date()); }catch(Exception e){ e.printStackTrace(); logger.info("{} 事务执行失败"+e,new Date()); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } } public class TransactionCheckListenerImpl implements TransactionCheckListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override //若是某条消息没有返回,那么此事件会主动询问查询数据库等渠道 获取消息是否正常 public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) { logger.info("server checking Msg {}",messageExt.toString()); return LocalTransactionState.COMMIT_MESSAGE; } } }