rocketMQ入门

一:什么是MQjava

MQ 就是 消息中间件。apache

 

二:为何使用MQide

场景:电商双十一 零点的秒杀。在那一瞬间,来自用户的请求将会激增,若是不作任何措施,那服务极可能会被压垮。可是咱们又不能直接把这些请求丢弃,而为了这个很小的时间段去扩容机器又显得大题小作。因而咱们天然而言的想到,能不能把这些请求先放到一个消息队列里面,而后系统从消息队列里面拿出来请求作逻辑的处理和响应。经过拉长时间维度来保证服务的稳定性。这就是MQ。ui

使用MQ只要解决的就是 在生产者消费者模式中,生产者生产的数据可能会忽然激增,消费者来不及消费的问题。spa

 

三:rocketMQ中间件

rocketMQ是一个MQ的实现。咱们在开发中一直在强调,不要重复造轮子。既然咱们须要一个MQ,那就找个别人实现过的MQ来用就好了。rocketMQ就是其中的一种。固然,还有其余的MQ组件,好比的 ActiveMQ、RabbitMQ,Kafka。blog

 

四:rocketMQ下载队列

http://rocketmq.apache.org/release_notesip

下载bin的包,好比 rocketmq-all-4.3.2-bin-release.zip 开发

 

五:安装

将下载的文件解压到对应目录。好比我解压到 C:\rocketmq-all-4.3.2

 

六:启动NAMESERVER

去 C:\rocketmq-all-4.3.2\bin目录下找到 mqnamesrv.cmd,双击运营便可。

 

七:启动BROKER

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

( 假如弹出提示框提示‘错误: 找不到或没法加载主类 xxxxxx’。打开runbroker.cmd,而后将‘%CLASSPATH%’加上英文双引号。保存并从新执行start语句。)

 

至此为止,rocketMQ就安装启动完成了。下面咱们写的demo来使用rocket作一个helloWord

 

八:写一个生产者,发消息

public class Producer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("producer");
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                Thread.sleep(2000);  //每2秒发送一次消息
                Message msg = new Message("TopicA-test",// topic
                        "TagA",// tag
                        (new Date() + "Hello RocketMQ ,QuickStart" + i)
                                .getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        producer.shutdown();
    }
}

  

 

九:写一个消费者,用来监听消息

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "rmq-group");

        consumer.setNamesrvAddr("127.0.0.1:9876");//设置rocketMQ服务的部署地址
        consumer.setInstanceName("consumer");
        /**
         * 被订阅消息的topic 和 subExpression。
         * 注意:必定要与消息发布者的topic 和 subExpression 一致
         */
        consumer.subscribe("TopicA-test", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {//监听器实现
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));//每次拿到消息我就打印出来
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

 

一个简单的demo就OK了

相关文章
相关标签/搜索