ActiveMQ

1、什么是消息中间件

  消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息排队模型,它能够在分布式环境下扩展进程间的通讯。对于消息中间件,常见的角色大体也就有Producer(生产者)、Consumer(消费者)java

  常见的消息中间件产品:spring

1ActiveMQ数据库

  ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。apache

  主要特色:服务器

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQPsession

  2. 彻底支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)app

  3. 对Spring的支持,ActiveMQ能够很容易内嵌到使用Spring的系统里面去,并且也支持Spring2.0的特性tcp

  4. 经过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中经过JCA 1.5 resource adaptors的配置,可让ActiveMQ能够自动的部署到任何兼容J2EE 1.4 商业服务器上分布式

  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTAide

  6. 支持经过JDBC和journal提供高速的消息持久化

  7. 从设计上保证了高性能的集群,客户端-服务器,点对点

  8. 支持Ajax

  9. 支持与Axis的整合

  10. 能够很容易得调用内嵌JMS provider,进行测试

(2)RabbitMQ

  AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通信,OpenStack开源云平台的通讯组件,最早在金融行业获得运用。

(3)ZeroMQ

  史上最快的消息队列系统

(4)Kafka

  Apache下的一个子项目 。特色:高吞吐,在一台普通的服务器上既能够达到10W/s的吞吐速率;彻底的分布式系统。适合处理海量数据。

2、JMS简介

2.1 什么是JMS

  JMS(JavaMessaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,而且经过提供标准的产生、发送、接收消息的接口简化企业应用的开发。

       JMS自己只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它相似于 JDBC(javaDatabase Connectivity):这里,JDBC 是能够用来访问许多不一样关系数据库的 API,而 JMS 则提供一样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您可以经过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另外一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。

  JMS 定义了五种不一样的消息正文格式,以及调用的消息类型,容许你发送并接收以一些不一样形式的数据,提供现有消息格式的一些级别的兼容性。

  • TextMessage--一个字符串对象
  • MapMessage--一套名称-值对
  • ObjectMessage--一个序列化的 Java 对象
  • BytesMessage--一个字节的数据流
  • StreamMessage -- Java 原始值的数据流

2.2 JMS的消息形式

  对于消息的传递有两种类型:

  一种是点对点的,即一个生产者和一个消费者一一对应;

  

  另外一种是发布/订阅模式,即一个生产者产生消息并进行发送后,能够由多个消费者进行接收。

  

 

3、ActiveMQ的安装

  官方网址:http://activemq.apache.org/

3.1 安装环境

  • 须要JDK
  • 安装Linux系统。生产环境都是Linux系统。

3.2 安装步骤

  第一步: 把ActiveMQ 的压缩包上传到Linux系统。

  第二步:解压缩。

  第三步:启动。

  

  第四步:进入后台管理: http://192.168.25.130:8161/admin

      用户名:admin

      密码:admin

     

4、ActiveMQ的入门案例

4.1 Queue(点对点模式)

  点对点的模式主要创建在一个队列上面,当链接一个列队的时候,发送端不须要知道接收端是否正在接收,能够直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,若是有接收端在监听,则会发向接收端,若是没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式能够有多个发送端,多个接收端,可是一条消息,只会被一个接收端给接收到,哪一个接收端先连上ActiveMQ,则会先接收到,然后来的接收端则接收不到那条消息。

【Producer】——生产者:生产消息,发送端

  第一步:把jar包添加到工程中。使用5.11.2版本的jar包。

    <!-- ActiveMQ客户端依赖的jar包 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>

  第二步:建立ConnectionFactory对象,须要指定服务端ip及端口号。

  第三步:使用ConnectionFactory对象建立一个Connection对象。

  第四步:开启链接,调用Connection对象的start方法。

  第五步:使用Connection对象建立一个Session对象。

  第六步:使用Session对象建立一个Destination对象(topic、queue),此处建立一个Queue对象。

  第七步:使用Session对象建立一个Producer对象。

  第八步:建立一个Message对象,建立一个TextMessage对象。

  第九步:使用Producer对象发送消息。

  第十步:关闭资源。

  @Test
    public void testQueueProducer() throws Exception {
        // 第一步:建立ConnectionFactory对象,须要指定服务端ip及端口号。
        // brokerURL:服务器的ip及端口号
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
        // 第二步:使用ConnectionFactory对象建立一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启链接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象建立一个Session对象。
        // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。一、自动应答二、手动应答。通常是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象建立一个Destination对象(topic、queue),此处建立一个Queue对象。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session对象建立一个Producer对象。
        MessageProducer producer = session.createProducer(queue);
        // 第七步:建立一个Message对象,建立一个TextMessage对象。
        /*ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText("hello activeMq,this is my first test.");*/
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }

   访问ActiveMQ首页,进入Queues标签后,便可看到发送的消息。(若是点击Queues或Topics时出现HTTP ERROR: 503的错误,处理方式为:ActiveMQ页面出现HTTP ERROR: 503错误处理方式

  

  列表各列信息含义以下:

  Number Of Pending Messages  等待消费的消息。这个是当前未出队列的数量。

  Number Of Consumers  消费者。这个是消费者端的消费者数量

  Messages Enqueued  进入队列的消息 。进入队列的总数量,包括出队列的。

  Messages Dequeued  出了队列的消息 。能够理解为是消费这消费掉的数量。

  其中:Messages Enqueued -Messages Dequeued =Number Of Pending Messages

【Consumer】——消费者:接收消息

  第一步:建立一个ConnectionFactory对象。

  第二步:从ConnectionFactory对象中得到一个Connection对象。

  第三步:开启链接。调用Connection对象的start方法。

  第四步:使用Connection对象建立一个Session对象。

  第五步:使用Session对象建立一个Destination对象。和发送端保持一致queue,而且队列的名称一致。

  第六步:使用Session对象建立一个Consumer对象。

  第七步:接收消息。

  第八步:打印消息。

  第九步:关闭资源

    //1.建立链接工厂
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
    //2.获取链接
    Connection connection = connectionFactory.createConnection();
    //3.启动链接
    connection.start();
    //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.建立队列对象
    Queue queue = session.createQueue("test-queue");
    //6.建立消息消费
    MessageConsumer consumer = session.createConsumer(queue);
    
    //7.监听消息
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            TextMessage textMessage=(TextMessage)message;
            try {
                System.out.println("接收到消息:"+textMessage.getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });    
    //8.等待键盘输入
    System.in.read();    
    //9.关闭资源
    consumer.close();
    session.close();
    connection.close();    

  接收消息后,ActiveMQ首页的Queues以下

  

4.2 Topic(发布/订阅模式)

【Producer】

  第一步:建立ConnectionFactory对象,须要指定服务端ip及端口号。

  第二步:使用ConnectionFactory对象建立一个Connection对象。

  第三步:开启链接,调用Connection对象的start方法。

  第四步:使用Connection对象建立一个Session对象。

  第五步:使用Session对象建立一个Destination对象(topic、queue),此处建立一个Topic对象。

  第六步:使用Session对象建立一个Producer对象。

  第七步:建立一个Message对象,建立一个TextMessage对象。

  第八步:使用Producer对象发送消息。

  第九步:关闭资源。

@Test
    public void testTopicProducer() throws Exception {
        // 第一步:建立ConnectionFactory对象,须要指定服务端ip及端口号。
        // brokerURL:服务器的ip及端口号
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
        // 第二步:使用ConnectionFactory对象建立一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启链接,调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象建立一个Session对象。
        // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
        // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。一、自动应答二、手动应答。通常是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象建立一个Destination对象(topic、queue),此处建立一个topic对象。
        // 参数:话题的名称。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象建立一个Producer对象。
        MessageProducer producer = session.createProducer(topic);
        // 第七步:建立一个Message对象,建立一个TextMessage对象。
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
        // 第八步:使用Producer对象发送消息。
        producer.send(textMessage);
        // 第九步:关闭资源。
        producer.close();
        session.close();
        connection.close();
    }

   发送完毕后,查看ActiveMQ后台的Topics

  

【Consumer】——消费者:接收消息

  第一步:建立一个ConnectionFactory对象。

  第二步:从ConnectionFactory对象中得到一个Connection对象。

  第三步:开启链接。调用Connection对象的start方法。

  第四步:使用Connection对象建立一个Session对象。

  第五步:使用Session对象建立一个Destination对象。和发送端保持一致topic,而且话题的名称一致。

  第六步:使用Session对象建立一个Consumer对象。

  第七步:接收消息。

  第八步:打印消息。

  第九步:关闭资源

@Test
    public void testTopicConsumer() throws Exception {
        // 第一步:建立一个ConnectionFactory对象。
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.130:61616");
        // 第二步:从ConnectionFactory对象中得到一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 第三步:开启链接。调用Connection对象的start方法。
        connection.start();
        // 第四步:使用Connection对象建立一个Session对象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session对象建立一个Destination对象。和发送端保持一致topic,而且话题的名称一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session对象建立一个Consumer对象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取消息的内容
                    text = textMessage.getText();
                    // 第八步:打印消息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消费端01。。。");
        // 等待接收消息
        System.in.read();
        // 第九步:关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

4.3 Queue与Topic的区别

  Topic和queue的最大区别在于topic是以广播的形式,通知全部在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

 5、ActiveMQ整合Spring

5.1 Producer

  在e3-manager-service中配置

  第一步:引用相关的jar包。

     <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>

   第二步:配置Activemq整合spring。配置ConnectionFactory

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">


    <!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.130:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
</beans>

  第三步:配置生产者。使用JMSTemplate对象。发送消息。

    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它能够进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是咱们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>

  第四步:在spring容器中配置Destination。

<!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>

  第五步:代码测试(发送消息)

@Test
    public void testSpringActiveMq() throws Exception {
        // 第一步:初始化一个spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        // 第二步:从容器中得到JMSTemplate对象。
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        // 第三步:从容器中得到一个Destination对象
        Destination destination = (Destination) applicationContext.getBean("queueDestination");
        // 第四步:使用JMSTemplate对象发送消息,须要知道Destination
        jmsTemplate.send(destination, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                //建立一个消息对象并返回
                TextMessage textMessage = session.createTextMessage("spring activemq queue message");
                return textMessage;
            }
        });
    } 

5.2 Consumer

  在e3-search-service中配置

  第一步:把Activemq相关的jar包添加到工程中

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
        </dependency>

  第二步:建立一个MessageListener的实现类。

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息内容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

  第三步:配置spring和Activemq整合。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.168:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
    <!-- 接收消息 -->
    <!-- 配置监听器 -->
    <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
</beans>

  第四步:测试代码。

  @Test
    public void testQueueConsumer() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //等待
        System.in.read();
    }