ActiveMQ入门,ActiveMQ与RocketMQ的对比

1. ActiveMQ入门

前面的文章已经写过MQ的相关概念,这里再也不赘述。java

1.1 ActiveMQ是什么

ActiveMQ是Apache下的开源项目,彻底支持JMS1.1和J2EE1.4规范的JMS Provider实现。web

1.2 ActiveMQ的特色

  • 支持多种语言编写客户端
  • 对Spring的支持,很容易和Spring整合
  • 支持多种传输协议:TCP,SSL,NIO,UDP等
  • 支持Ajax请求

1.3 ActiveMQ的安装

1.3.1 官网下载

http://activemq.apache.org/spring

解压后的文件夹结构:数据库

1.3.2 启动ActiveMQ

直接双击这个“wrapper.exe”便可apache

以后能够在浏览器输入http://localhost:8161/浏览器

1.3.3 进入管理中心

点击Manage ActiveMQ broker,会弹出身份验证,输入admin,admin便可session

1.4 搭建Maven工程框架

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.linkedbear</groupId>

    <artifactId>ActiveMQ-Demo</artifactId>

    <version>0.0.1-SNAPSHOT</version>

   

    <properties>

        <activemq.version>5.15.4</activemq.version>

    </properties>



    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.0.0.RELEASE</version>

    </parent>



    <dependencies>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <!-- ActiveMQ -->

        <dependency>

             <groupId>org.apache.activemq</groupId>

             <artifactId>activemq-client</artifactId>

             <version>${activemq.version}</version>

         </dependency>



       

        <!-- 热部署 -->

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-devtools</artifactId>

        </dependency>

    </dependencies>



    <build>

        <plugins>

            <plugin>

                <artifactId>maven-compiler-plugin</artifactId>

                <configuration>

                    <source>1.8</source>

                    <target>1.8</target>

                </configuration>

            </plugin>

        </plugins>

    </build>

</project>

1.5 建立工程目录结构

以前的文章中写过,JMS的消息传递有两种模式,前面的RocketMQ中只写了一对一模式,本篇文章将会编写两种模式。app

1.6 一对一模式的Queue

1.6.1 生产者

/**

 * 生产者Controller

 * @Title ProducerQueueController

 * @author LinkedBear

 * @Time 2018年8月3日 下午4:52:49

*/

@Controller

public class ProducerQueueController {

    @RequestMapping("/queueProduceMessage")

    @ResponseBody

    public Map<String, Object> queueProduceMessage() throws Exception {

        //JMS的使用比较相似于JDBC与Hibernate

        //1. 建立一个链接工厂(相似于JDBC中的注册驱动),须要传入TCP协议的ActiveMQ服务地址

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        //2. 建立链接(相似于DriverManager.getConnection)

        Connection connection = connectionFactory.createConnection();

        //3. 开启链接(ActiveMQ建立的链接是须要手动开启的)

        connection.start(); //注意不是open。。。

        //4. 获取session(相似于Hibernate中的session,都是用会话来进行操做)

        //里面有两个参数,参数1为是否开启事务,参数2为消息确认模式

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 建立一对一的消息队列

        Queue queue = session.createQueue("test_queue");

        //6. 建立一条消息

        String text = "test queue message" + Math.random();

        TextMessage message = session.createTextMessage(text);

        //7. 消息须要发送方,要建立消息发送方(生产者),并绑定到某个消息队列上

        MessageProducer producer = session.createProducer(queue);

        //8. 发送消息

        producer.send(message);

        //9. 关闭链接

        producer.close();

        session.close();

        connection.close();

       

        //------显示发送的消息到视图上------

        Map<String, Object> map = new HashMap<>();

        map.put("message", text);

        return map;

    }

}

1.6.2 消费者

/**

 * 消费者Controller

 * @Title ConsumerQueueController

 * @author LinkedBear

 * @Time 2018年8月3日 下午4:52:56

*/

@Controller

public class ConsumerQueueController {

    @RequestMapping("/queueGetMessage1")

    public void queueGetMessage1() throws Exception {

        //1. 建立一个链接工厂,须要传入TCP协议的ActiveMQ服务地址

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        //2. 建立链接

        Connection connection = connectionFactory.createConnection();

        //3. 开启链接

        connection.start(); //注意不是open。。。

        //4. 获取session

        //里面有两个参数,参数1为是否开启事务,参数2为消息确认模式

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 建立一对一的消息队列

        Queue queue = session.createQueue("test_queue");

       

        //------------前5步都是相同的,如下为不一样----------------

        //6. 建立消费者

        MessageConsumer consumer = session.createConsumer(queue);

        //7. 使用监听器监听队列中的消息

        consumer.setMessageListener(new MessageListener() {

            @Override

            public void onMessage(Message message) {

                TextMessage textMessage = (TextMessage) message;

                try {

                    String text = textMessage.getText();

                    System.out.println("收到消息:" + text);

                } catch (JMSException e) {

                    e.printStackTrace();

                }

            }

        });

       

        //因为设置监听器后不能立刻结束方法,要在这里加一个等待点

        System.in.read();

       

        //8. 关闭链接

        consumer.close();

        session.close();

        connection.close();

    }



    @RequestMapping("/queueGetMessage2")

    public void queueGetMessage2() throws Exception //(彻底相同,再也不重复)

}

1.6.3 运行结果

先执行两个消息的消费者框架

http://localhost:8080/queueGetMessage1dom

http://localhost:8080/queueGetMessage2

执行http://localhost:8080/queueProduceMessage

可是只收到一条消息

1.7 一对多模式的Topic

1.7.1 生产者

/**

 * 生产者Controller

 * @Title ProducerTopicController

 * @author LinkedBear

 * @Time 2018年8月3日 下午4:52:49

*/

@Controller

public class ProducerTopicController {

    @RequestMapping("/topicProduceMessage")

    @ResponseBody

    public Map<String, Object> topicProduceMessage() throws Exception {

        //JMS的使用比较相似于JDBC与Hibernate

        //1. 建立一个链接工厂(相似于JDBC中的注册驱动),须要传入TCP协议的ActiveMQ服务地址

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        //2. 建立链接(相似于DriverManager.getConnection)

        Connection connection = connectionFactory.createConnection();

        //3. 开启链接(ActiveMQ建立的链接是须要手动开启的)

        connection.start(); //注意不是open。。。

        //4. 获取session(相似于Hibernate中的session,都是用会话来进行操做)

        //里面有两个参数,参数1为是否开启事务,参数2为消息确认模式

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 建立一对多的消息广播

        Topic topic = session.createTopic("test_topic");

        //6. 建立一条消息

        String text = "test topic message" + Math.random();

        TextMessage message = session.createTextMessage(text);

        //7. 消息须要发送方,要建立消息发送方(生产者),并广播到某个消息广播端上

        MessageProducer producer = session.createProducer(topic);

        //8. 发送消息

        producer.send(message);

        //9. 关闭链接

        producer.close();

        session.close();

        connection.close();

       

        //------显示发送的消息到视图上------

        Map<String, Object> map = new HashMap<>();

        map.put("message", text);

        return map;

    }

}

1.7.2 消费者

/**

 * 消费者Controller

 * @Title ConsumerTopicController

 * @author LinkedBear

 * @Time 2018年8月3日 下午4:52:56

*/

@Controller

public class ConsumerTopicController {

    @RequestMapping("/topicGetMessage")

    public void topicGetMessage() throws Exception {

        //1. 建立一个链接工厂,须要传入TCP协议的ActiveMQ服务地址

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        //2. 建立链接

        Connection connection = connectionFactory.createConnection();

        //3. 开启链接

        connection.start(); //注意不是open。。。

        //4. 获取session

        //里面有两个参数,参数1为是否开启事务,参数2为消息确认模式

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5. 建立一对多的消息广播

        Topic topic = session.createTopic("test_topic");

       

        //------------前5步都是相同的,如下为不一样----------------

        //6. 建立消费者

        MessageConsumer consumer = session.createConsumer(topic);

        //7. 使用监听器监听队列中的消息

        consumer.setMessageListener(new MessageListener() {

            @Override

            public void onMessage(Message message) {

                TextMessage textMessage = (TextMessage) message;

                try {

                    String text = textMessage.getText();

                    System.out.println("收到消息:" + text);

                } catch (JMSException e) {

                    e.printStackTrace();

                }

            }

        });

       

        //因为设置监听器后不能立刻结束方法,要在这里加一个等待点

        System.in.read();

       

        //8. 关闭链接

        consumer.close();

        session.close();

        connection.close();

    }



    @RequestMapping("/topicGetMessage2")

    public void topicGetMessage2() throws Exception //(彻底相同,再也不重复)

}

1.7.3 运行结果

先执行两个消息的消费者

http://localhost:8080/topicGetMessage1

http://localhost:8080/topicGetMessage2

执行http://localhost:8080/topicProduceMessage

此次收到了两条消息

2. RocketMQ与ActiveMQ的对比

从这两种消息中间件的编写过程来看,两种产品的区别是比较大的,下面就这两种产品进行多方面对比。

参考文章:http://www.javashuo.com/article/p-gqzlvfbx-p.html

比较项

RocketMQ

ActiveMQ

语言支持

只支持Java

多语言,Java为主

可用性

分布式

主从

JMS规范

经常使用的使用方式没有遵循JMS

严格遵循JMS规范

消息持久化

硬盘

内存,硬盘,数据库

部署方式

独立部署

独立部署、嵌入应用,能够与Spring很好的整合

社区活跃

活跃

不很活跃

相关文章
相关标签/搜索