参考资料:java
官网下载二进制安装包(固然也可下载源码包后本身编译):下载地址mysql
解压redis
unzip rocketmq-all-4.3.0-bin-release.zip
修改配置sql
在 conf/broker.conf
中新增数据库
brokerIP1 = 192.168.195.88 autoCreateTopicEnable = true # 线上环境应该设为false
在bin/runbroker.sh
中修改JVM内存大小,默认是8G,通常本身电脑的上虚拟机可能没这么大apache
启动架构
# 后台启动NameServer nohup sh bin/mqnamesrv -n 192.168.195.88:9876 & # 查看日志,看是否启动成功 tail -f ~/logs/rocketmqlogs/namesrv.log # 后台启动Broker nohup sh bin/mqbroker -n 192.168.195.88:9876 -c conf/broker.conf & # 查看日志,看是否启动成功 tail -f ~/logs/rocketmqlogs/broker.log
中止dom
sh bin/mqshutdown broker sh bin/mqshutdown namesrv
注意:异步
在RocketMQ的bin目录下有一个mqadmin脚本,它充当着控制台的角色,能够用来完成咱们经常使用的操做。如不喜欢命令可安装第三方的可视化操控界面工具async
建立topic
sh mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t testTopic1 # 参数 -n为nameServe服务地址 -b为broker服务地址 -t为topic的名字
查询全部topic
sh mqadmin topicList -n localhost:9876 # 参数 -n为nameServe服务地址
查看Topic统计信息
sh mqadmin topicStatus -n localhost:9876 -t testTopic1 # 参数 -n为nameServe服务地址 -t为topic的名字
查看消费组信息
sh mqadmin consumerProgress -n localhost:9876 -g simple_push_consumer_group_01 # 参数 -n为nameServe服务地址 -g为消费组的名字,无则表示查看全部的
查看全部命令:sh mqadmin
注意点:
RocketMQ的数据存储主要有三个内容:ConsumeQueue、CommitLog和IndexFile
ConsumeQueue是消息的逻辑队列,是由20字节定长的二进制数据单元组成,其中commitLogOffset(8 byte)、msgSize(4 byte)、tagsHashCode(8 byte);每一个Topic和QueuId对应一个ConsumeQueue;单个文件大小约5.72M,每一个文件由30W条数据组成,每一个文件默认大小为600万个字节,当一个ConsumeQueue类型的文件写满了,则写入下一个文件。
CommitLog是消息存放的实际物理位置,每一个Broker下全部的Topic下的消息队列共用同一个CommitLog的日志数据文件来存储,全部RocketMQ的写入是顺序的。单个CommitLog文件的默认大小为1G。
IndexFile即消息索引,若是一个消息包含key值的话,会使用IndexFile存储消息索引,其每一个单元的数据构成为keyHash(4 byte)、commitLogOffset(8 byte)、timestamp(4 byte)、nextIndexOffset(4byte)。IndexFile主要是用来根据key来查询消息。
Producer端发送消息最终写入的是CommitLog,写入CommitLog有同步刷盘和异步刷盘两种方式:
同步刷盘:只有在消息真正持久化至磁盘后,Broker端才会真正地返回给Producer端一个成功的ACK响应。
异步刷盘:只要消息写入PageCache便可将成功的ACK返回给Producer端。
Consumer端先从ConsumeQueue读取持久化消息的offset,随后再从CommitLog中进行读取消息的真正实体内容。因此实际上读取操做是随机而不是顺序的,因此这也是消费速度是比Kafka低的缘由。
RocketMQ发送消息有三种方式
同步
消息发送后,等待服务端的ack响应,这种方式最可靠,但效率最低
异步
消息发送注册回调函数,不需等待服务端的响应
单向
消息发送后,不关心服务端是否成功接受
若是Producer发送消息失败,会自动重试,重试的策略:
RocketMQ消费消息主要有两种方式:
pull模式
由消费者客户端主动向服务端拉取消息。
通常状况下,若是咱们没有控制好pull的频率,频率太低时,则可能消费速度过低致使消息的积压,频率太高时,则可能发送过多无效或低效pull请求,增长了服务端负载。
为了解决这个问题,RocketMQ在没有足够的消息时(如服务端没有可消费的消息),并不会当即返回响应,而是保持并挂起当前请求,待有足够的消息时在返回。而且咱们须要指定offset的起点和终点,而且须要咱们本身保存好本次消费的offset点,下次消费的时候好从上次的offset点开始拉取消息。
pull模式咱们并不常用。
push模式
由服务端主动地将消息推送给消费者。
push模式下,慢消费的状况可能致使消费者端的缓冲区溢出。
可是在RocketMQ中并非真正的push,而是基于长轮训的pull模式的来实现的伪push。具体的实现是:Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,若是有消息就当即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。若是broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。
即消费失败后,隔一段时间从新消费该消息。
重试队列
RocketMQ会为每一个消费组都设置一个Topic名称为%RETRY%+consumerGroup
的重试队列(这里须要注意的是,这个Topic的重试队列是针对消费组,而不是针对每一个Topic设置的),用于暂时保存由于各类异常而致使Consumer端没法消费的消息。Consumer端出现异常失败时,失败的消息会从新发送给服务端的重试队列。
死信队列
重试队列中超过配置的“最大重试消费次数”后就会移入到这个死信队列中。在RocketMQ中,SubscriptionGroupConfig配置常量默认地设置了两个参数,一个是retryQueueNums为1(重试队列数量为1个),另一个是retryMaxTimes为16(最大重试消费的次数为16次)。Broker端经过校验判断,若是超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里,RocketMQ会为每一个消费组都设置一个Topic命名为%DLQ%+consumerGroup
的死信队列。
通常在实际应用中,移入至死信队列的消息,须要人工干预处理。
注意点:
RocketMQ的的默认延迟级别分为16个,因此一条消息最大的重试次数为16;
// 源码位置:org.apache.rocketmq.store.config.MessageStoreConfig.class // 如需修改,则须要修改broker的配置,官方并不建议修改 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; //若是咱们在发送消息时设置消息的延迟级别为3,则表示消息10s后才能被消费者发现 msg.setDelayTimeLevel(3);
RocketMQ的Message中的reconsumeTimes
属性,表示该消息当前已重试的的次数,咱们能够经过以下方法来控制最大重试的次数,超过最大重试次数的消息将移入死信队列中。
// 设置最多重试3次, 默认16 consumer.setMaxReconsumeTimes(3);
RocketMQ自身的重试机制,默认消息的初始延迟级别就为3,好像并无法修改(不敢确定)。源码以下:
// 源码位置:org.apache.rocketmq.client.impl.consumer.ProcessQueue.class public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { ... pushConsumer.sendMessageBack(msg, 3); ... }
public class Producer { public static void main(String[] args) { // 初始化一个生产者,生产组为simple_producer_group DefaultMQProducer defaultMQProducer = new DefaultMQProducer("simple_producer_group_01"); // 设置NameServer地址 defaultMQProducer.setNamesrvAddr("192.168.195.88:9876"); try { // 启动 defaultMQProducer.start(); Producer producer = new Producer(); producer.syncSend(defaultMQProducer); producer.asyncSend(defaultMQProducer); producer.onewaySend(defaultMQProducer); } catch (Exception e) { log.error("异常:", e); } finally { defaultMQProducer.shutdown(); } } /** * 同步方式 */ private void syncSend(DefaultMQProducer producer) throws InterruptedException, RemotingException,MQClientException, MQBrokerException, UnsupportedEncodingException { for(int i=0; i<100; i++){ String messageBody = "syncSend message" + i ; Message msg = new Message("testTopic1", "*", messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); /* 设置消息延迟级别,默认有16个级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 注意,若是设置为3,意思是10s再被消费 */ //msg.setDelayTimeLevel(2); SendResult sendResult = producer.send(msg, 10000); log.info("onewaySend发送结果:{}", sendResult); } } /** * 异步方式 */ private void asyncSend(DefaultMQProducer producer) throws UnsupportedEncodingException, RemotingException, MQClientException, InterruptedException { for(int i=0; i<100; i++){ String messageBody = "asyncSend message" + i ; Message msg = new Message("testTopic1", "*", messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("asyncSend发送成功:{}", sendResult); } @Override public void onException(Throwable e) { log.info("asyncSend发送异常:{}", e); } }); } } /** * 单向方式 */ private void onewaySend(DefaultMQProducer producer) throws RemotingException, MQClientException, InterruptedException, UnsupportedEncodingException { for(int i=0; i<100; i++){ String messageBody = "onewaySend message" + i ; Message msg = new Message("testTopic1", "*", messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); log.info("onewaySend发送完成"); } } }
public class Consumer { public static void main(String[] args) throws Exception { new Consumer().pushMode(); //new Consumer().pullMode(); } /** * push 消费模式 */ private void pushMode() throws MQClientException { // 设置消费组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simple_push_consumer_group_01"); // 设置nameServer地址 consumer.setNamesrvAddr("192.168.195.88:9876"); // 设置从头开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定topic consumer.subscribe("testTopic1", "*"); // 设置批量消费消息的数量,默认1 consumer.setConsumeMessageBatchMaxSize(1); // 设置最多重试3次, 默认16 consumer.setMaxReconsumeTimes(3); // 注册消息监听 consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> { try { MessageExt msg = msgs.get(0); log.info("收到消息, body:{}, reconsumeTimes={}, delayTimeLevel={}", new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET), msg.getReconsumeTimes(), msg.getDelayTimeLevel()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.info("消费异常:", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); // 运行 consumer.start(); log.info("消费者启动成功"); } /** 用于pull模式中记录offset使用*/ private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>(); /** * pull 消费模式, 不建议使用 */ private void pullMode() throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("simple_pull_consumer_group_01"); consumer.setNamesrvAddr("192.168.195.88:9876"); consumer.start(); // 获取topic中的全部队列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("testTopic1"); for (MessageQueue mq : mqs) { log.info("消费队列:{}", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); log.info("pullResult:{}", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: pullResult.getMsgFoundList().forEach(msgExt -> { try { log.info("收到消息:{}", new String(msgExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.info("异常:", e); } }); break; case NO_NEW_MSG: break SINGLE_MQ; case NO_MATCHED_MSG: case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { log.info("异常:", e); } } } } /** * 获取偏移量 */ private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSET_TABLE.get(mq); if (offset != null) { return offset; } return 0; } /** * 保存偏移量 */ private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSET_TABLE.put(mq, offset); } }
逻辑是这样的:
实例代码(Springboot项目):
@Component @Slf4j public class TransactionProducer implements InitializingBean { @Autowired private TransactionListener transactionListener; private TransactionMQProducer producer; @Override public void afterPropertiesSet() throws Exception { producer = new TransactionMQProducer("transaction_producer_group_01"); producer.setNamesrvAddr("192.168.195.88:9876"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); try { producer.start(); } catch (MQClientException e) { log.error("TransactionProducer 启动异常:", e); } // 添加关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() -> producer.shutdown())); } /** * 发送事务消息 */ public void produce() { try { String body= "transaction_test_1"; Message msg = new Message("transactionTopicTest1", "user", UUID.randomUUID().toString(), body.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); log.info("发送事务消息结果:{}", sendResult); } catch (MQClientException | UnsupportedEncodingException e) { log.info("发送事务消息异常:{}", e); } } }
@Component @Slf4j public class TransactionListenerImpl implements TransactionListener { @Resource(name = "jdbcTemplate1") private JdbcTemplate jdbcTemplate; /** 生产中能够用redis或数据库代替 */ private ConcurrentHashMap<String, LocalTransactionState> localTrans = new ConcurrentHashMap<>(); /** * 执行本地事务 * @param msg 事务消息 * @param arg 自定义参数 * @return 执行结果 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { jdbcTemplate.execute("insert into user(name) values('transaction_test_1')"); localTrans.put(msg.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE); log.info("本地事务执行成功"); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e){ localTrans.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE); log.error("本地事务执行失败"); return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { LocalTransactionState state = localTrans.get(msg.getTransactionId()); log.info("执行事务回查,transactionId:{}, transactionState:{}", msg.getTransactionId(), state); if (null != state) { return state; } return LocalTransactionState.UNKNOW; } }
@Component @Slf4j public class TransactionConsumer implements InitializingBean { @Resource(name = "jdbcTemplate2") private JdbcTemplate jdbcTemplate; @Override public void afterPropertiesSet() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group_01"); consumer.setNamesrvAddr("192.168.195.88:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("transactionTopicTest1", "user"); // 只重试三次 consumer.setMaxReconsumeTimes(3); consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> { try { MessageExt msg = msgs.get(0); log.info("收到消息:{}", msg); // 正式项目中,咱们应该保存该msgId,防止消息的重复消费 // String msgId = msg.getMsgId(); String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); jdbcTemplate.execute("insert into user(name) values('"+body+"')"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.info("异常:", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); log.info("TransactionConsumer启动成功"); } }
/** * 数据源配置,用的Mysql和Druid */ @Configuration public class DbConfig { @Bean(name = "dataSource1") DataSource dataSource1() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta1?useUnicode=true&characterEncoding=UTF8&useSSL=false"); dataSource.setUsername("root"); dataSource.setPassword("root"); return dataSource; } @Bean(name = "dataSource2") DataSource dataSource2() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta2?useUnicode=true&characterEncoding=UTF8&useSSL=false"); dataSource.setUsername("root"); dataSource.setPassword("root"); return dataSource; } @Bean("jdbcTemplate1") JdbcTemplate first(@Qualifier("dataSource1") DataSource dataSource) { JdbcTemplate jdbcTemplate = new JdbcTemplate(); jdbcTemplate.setDataSource(dataSource); return jdbcTemplate; } @Bean("jdbcTemplate2") JdbcTemplate second(@Qualifier("dataSource2") DataSource dataSource) { JdbcTemplate jdbcTemplate = new JdbcTemplate(); jdbcTemplate.setDataSource(dataSource); return jdbcTemplate; } }
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTest { @Autowired TransactionProducer transactionProducer; /** * 测试分布式事务 */ @Test public void testTransactionProducer() throws InterruptedException { transactionProducer.produce(); Thread.sleep(10000000); } }
如下内容没有主要研究,主要摘自官网,有须要时再详细研究。
原理和Kafka要作到有序差很少,即把消息放入一个队列中,而后使用一个消费者去进行消费。
即每一个消费者消费都全部的消息
//设置广播模式属性便可 consumer.setMessageModel(MessageModel.BROADCASTING);
批量发送消息可提升生产者的性能。注意的是:同一批次的消息应该具备:相同的topic,相同的waitStoreMsgOK和没有延时计划,而且一批消息的总大小不该超过1M。
RocketMQ在消息过滤上是比较强大的,虽然咱们可能不会常常用到它。Message除了能够Tag来加以区分外,咱们还能够为它添加额外的属性。以下:
在发送消息时:
// 消息添加额外属性 msg.putUserProperty("a", "3");
在消费消息时:
// 只有消费具备属性a,a>= 0且a <= 3的消息 consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
还有很是多的过滤方式如:in
and
or
not
is null
=
<
>
等等
RocketMQ不保证消息不重复,若是你的业务须要保证严格的不重复消息,须要你本身在业务端去重。