深刻浅出 JMS(二) - ActiveMQ 入门指南

深刻浅出 JMS(二) - ActiveMQ 入门指南

上篇博文深刻浅出 JMS(一) – JMS 基本概念,咱们介绍了消息通讯的规范JMS,这篇博文介绍一款开源的 JMS 具体实现—— ActiveMQ。ActiveMQ 是一个易于使用的消息中间件。html

1、消息中间件和 ActiveMQ

(1) 消息中间件(MOM:Message Orient middleware)

咱们简单的介绍一下消息中间件,对它有一个基本认识就好,消息中间件有不少的用途和优势:java

  1. 将数据从一个应用程序传送到另外一个应用程序,或者从软件的一个模块传送到另一个模块;
  2. 负责创建网络通讯的通道,进行数据的可靠传送;
  3. 保证数据不重发,不丢失;
  4. 可以实现跨平台操做,可以为不一样操做系统上的软件集成技工数据传送服务。

(2) ActiveMQ

首先简单的介绍一下 MQ,MQ 英文名 MessageQueue,中文名也就是你们用的消息队列,干吗用的呢,说白了就是一个消息的接受和转发的容器,可用于消息推送。web

下面进入咱们今天的主题,为你们介绍 ActiveMQ。apache

Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.
Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4.

ActiveMQ 是由 Apache 出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,它很是快速,支持多种语言的客户端和协议,并且能够很是容易的嵌入到企业的应用环境中,并有许多高级功能。后端

下面咱们下载一个版本,玩一玩。浏览器

2、运行 ActiveMQ 服务

(1) 下载,解压缩

ActiveMQ 下载网站:http://activemq.apache.org/activemq-5153-release.html安全

你们如今好以后,将 apache-activemq-5.15.3-bin.zip 解压缩,咱们能够看到它的总体目录结构:服务器

图2.1 activemq目录结构

从它的目录来讲,仍是很简单的:网络

  1. bin:存放的是脚本文件
  2. conf:存放的是基本配置文件
  3. data:存放的是日志文件
  4. docs:存放的是说明文档
  5. examples:存放的是简单的实例
  6. lib:存放的是 activemq 所需 jar 包
  7. webapps:用于存放项目的目录

(2) 启动 ActiveMQ

咱们了解 activemq 的基本目录,下面咱们运行一下 activemq 服务,双击 bin 目录下的 bin/win64/activemq.bat 脚本文件,就能够看下图的效果。session

图2.2 activemq启动

从上图咱们能够看到 activemq 的存放地址,以及浏览器要访问的地址。

(3) 测试

ActiveMQ 默认使用的 TCP 链接端口是 61616, 经过查看该端口的信息能够测试 ActiveMQ 是否成功启动

netstat -an | find "61616" 

TCP     0.0.0.0:61616     0.0.0.0:0       LISTENING

(4) 监控

ActiveMQ 默认启动时,启动了内置的 jetty 服务器,提供一个用于监控 ActiveMQ 的 admin 应用。地址:http://127.0.0.1:8161/admin/

用户名和密码都是 admin

图2.4 activemq监控

3、ActiveMQ 特性列表

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 彻底支持 JMS1.1 和 J2EE 1.4 规范 (持久化,XA消息,事务)
  3. 对 Spring 的支持,ActiveMQ 能够很容易内嵌到使用 Spring 的系统里面去,并且也支持 Spring2.0 的特性
  4. 经过了常见 J2EE 服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中经过 JCA 1.5 resource adaptors 的配置,可让 ActiveMQ 能够自动的部署到任何兼容 J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持经过 JDB C和 journal 提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持 Ajax
  9. 支持与 Axis 的整合
  10. 能够很容易得调用内嵌 JMS provider,进行测试

4、什么状况下使用 ActiveMQ

  1. 多个项目之间集成

    • 跨平台
    • 多语言
    • 多项目
  2. 下降系统间模块的耦合度,解耦

    • 软件扩展性
  3. 系统先后端隔离

    • 先后端隔离,屏蔽高安全区

5、ActiveMQ 快速入门

们首先写一个简单的 Hello World 示例,让你们感觉下 Activemq,咱们须要实现接受者和发送者两部分代码的编写。

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.3</version>
</dependency>

 (1) 生产者

public class Producer {

    public static void main(String[] args) throws JMSException {
        //1. 建立 ConnectionFactory 链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory( // (1)
                //ActiveMQConnectionFactory.DEFAULT_USER,
                //ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "admin", "password",
                "tcp://localhost:61616/"
        );

        //2. 建立 connection,并启动链接
        Connection connection = factory.createConnection();  // (2)
        connection.start();

        //3. Session 是一个发送或接收消息的线程
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  // (3)

        //4. 指定生产消息目标禾消费消息来源的 Destination 对象
        Destination dest = session.createQueue("queue1");  // (4)

        //5. 建立生产者
        MessageProducer producer = session.createProducer(dest);  // (5)

        //6. 指定签收模式
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  // (6)

        //7. 建立消息
        for (int i = 0; i < 10; i++) {  // (7)
            TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa");
            producer.send(message);
        }

        if (connection != null) {
            connection.close();
        }
    }
}
  1. 建立 ConnectionFactory 实例,ActiveMQConnectionFactory 构造函数传入三个参数,分别是用户名,密码,消息地址,tcp 端口能够在 conf/activemq.xml 中配制。

  2. 建立 connection,并启动链接,Connection 默认是关闭的。注意:使用结束后要关闭 connection.close()

  3. 建立 session 会话,一个 connection 能够建立多个 session,Session 是一个发送或接收消息的线程。

    • 参数1:是否开启事务,若是开启事务,则在必须提交 session

      for (int i = 0; i < 10; i++) {  // (7)
          TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa");
          producer.send(message);
      }
      session.commit();
    • 参数2:签收模式,有 Session.AUTO_ACKNOWLEDGE(自动)、Session.CLIENT_ACKNOWLEDGE(手动 经常使用)、Session.DUPS_OK_ACKNOWLEDGE(可能重复签收),如若选择 Session.CLIENT_ACKNOWLEDGE,则必须在消费端确认,不然 ActiveMQ 不认为消息已经消费。生产中通常使用 Session.CLIENT_ACKNOWLEDGE 签收,不要相信自动签收方式。

      # 客户端处理
      TextMessage message = (TextMessage) consumer.receive();
      message.acknowledge();
  4. 经过 session 建立 Destination 对象,指定生产消息目标禾消费消息来源的对象。在 PTP 模式中 Destination 被称做 Queue 即队列;在 Pub/sub 模式 Destination 被称做 Topic 即主题,在程序中可使用多个 Queue 和 Top。

  5. 建立消息的发送和接收对象(生产者和消贵者) MessageProducer/MessageConsumer

  6. 指定签收模式。使用 MessageProducer 的 setDeliveryMode 方法为其设置持久化特性和非持久化特性(DeliveryMode)

  7. 发送消息。使用 JMS 规范的 TextMessage 形式建立数据(经过 session 对象),而且 MessageProducer 的 send 方法发送数据。同理客户端使用 receive 方法进行接收数据。最后不要忘记关闭 Connection 链接。

    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)

    • destination 能够指定将不一样的消息发送到不一样的 destination,但消费端就必须指定对应的 destination

    • deliveryMode 是否开启持久化,默认为持久性,DeliveryMode.NON_PERSISTENTDeliveryMode.PERSISTENT

    • priority 优先级 0-9,默认为 4,优先级越高越先消费,几率

    • timeToLive ActiveMQ 中消息保留的时间,单位秒,默认永久保存

(2) 消费者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616/"
        );

        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

        Destination dest = session.createQueue("queue1"); // (1)

        MessageConsumer consumer = session.createConsumer(dest);

        while (true) {
            TextMessage message = (TextMessage) consumer.receive();
            //message.acknowledge(); 
            if (message == null)
                break;
            System.out.println(message.getText());
        }

        if (connection != null) {
            connection.close();
        }
    }
}
  1. 消费端与生产者大同小异,注意对应的参数最好设定为一致。必须从对应的 Destination 取出数据,不然没法取到数据。
相关文章
相关标签/搜索