消息中间件ActiveMQ使用详解

消息中间件ActiveMQ使用详解

1、消息中间件的介绍

介绍

消息队列 是指利用 高效可靠消息传递机制 进行与平台无关的 数据交流,并基于 数据通讯 来进行分布式系统的集成。html

特色(做用)

  • 应用解耦
  • 异步通讯
  • 流量削峰
  • (海量)日志处理
  • 消息通信
  • …...

应用场景

根据消息队列的特色,能够衍生出不少场景,或者说不少场景都能用到。下面举几个例子:java

1)异步通讯docker

​ 注册时的短信、邮件通知,减小响应时间;apache

2)应用解耦编程

​ 信息发送者和消息接受者无需耦合,好比调用第三方;服务器

3)流量削峰微信

​ 例如秒杀系统;网络

2、消息中间件的对比

1.ActiveMQ

官网:activemq.apache.org/session

简介:架构

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

特色:

  1. 支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各类跨语言客户端和协议

  2. 彻底支持JMS客户端和Message Broker中的企业集成模式

  3. 支持许多高级功能,如消息组,虚拟目标,通配符和复合目标

  4. 彻底支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息

  5. Spring支持,以便ActiveMQ能够轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置

  6. 专为高性能集群,客户端 - 服务器,基于对等的通讯而设计

  7. CXF和Axis支持,以便ActiveMQ能够轻松地放入这些Web服务堆栈中以提供可靠的消息传递

  8. 能够用做内存JMS提供程序,很是适合单元测试JMS

  9. 支持可插拔传输协议,例如in-VM,TCP,SSL,NIO,UDP,多播,JGroups和JXTA传输

  10. 使用JDBC和高性能日志支持很是快速的持久性

2.RabbitMQ

官网:www.rabbitmq.com/

简介:

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。RabbitMQ轻巧且易于部署在云端。 它支持多种消息传递协议。 RabbitMQ能够部署在分布式和联合配置中,以知足高规模,高可用性需求。RabbitMQ可运行在许多操做系统和云环境中,并为大多数流行语言提供普遍的开发工具。(来自官网翻译)

AMQP (Advanced MessageQueue):高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ最初普遍应用于金融行业,根据官网描述,它具备以下特色:

特色:

  1. 异步消息传递:支持多种消息协议,消息队列,传送确认,灵活的路由到队列,多种交换类型;
  2. 支持几乎全部最受欢迎的编程语言:Java,C,C ++,C#,Ruby,Perl,Python,PHP等等;
  3. 能够部署为高可用性和吞吐量的集群; 跨多个可用区域和区域进行联合;
  4. 可插入的身份验证,受权,支持TLS和LDAP。;
  5. 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面;
  6. 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。

3. Kafka

官网:kafka.apache.org/

简介:

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它能够处理消费者规模的网站中的全部动做流数据。 这种动做(网页浏览,搜索和其余用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。 对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是经过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了经过集群来提供实时的消息。

Kafka它主要用于处理活跃的流式数据,所以Kafaka在大数据系统中使用较多。

特色:

  1. 同时为发布和订阅提供高吞吐量。据了解,Kafka每秒能够生产约25万消息(50 MB),每秒处理55万消息(110 MB)。

  2. 可进行持久化操做。将消息持久化到磁盘,所以可用于批量消费,例如ETL,以及实时应用程序。经过将数据持久化到硬盘以及replication防止数据丢失。

  3. 分布式系统,易于向外扩展。全部的producer、broker和consumer都会有多个,均为分布式的。无需停机便可扩展机器。

  4. 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。

  5. 支持online和offline的场景。

4. RocketMQ

官网:rocketmq.apache.org/

简介:

RocketMQ是阿里开源的消息中间件,目前在Apache孵化,使用纯Java开发,具备高吞吐量、高可用性、适合大规模分布式系统应用的特色。RocketMQ思路起源于Kafka,但并非简单的复制,它对消息的可靠传输及事务性作了优化,目前在阿里集团被普遍应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景,支撑了阿里屡次双十一活动。

特色:

  1. 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  2. 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
  3. 支持拉(pull)和推(push)两种消息模式
  4. 单一队列百万消息的堆积能力
  5. 支持多种消息协议,如 JMS、MQTT 等
  6. 分布式高可用的部署架构,知足至少一次消息传递语义
  7. 提供 docker 镜像用于隔离测试和云集群部署
  8. 提供配置、指标和监控等功能丰富的 Dashboard

3、ActiveMQ的安装

1.安装步骤

activemq在各个系统下都有对应的安装包。如下来演示Linux系统下安装activemq。

进入apache-activemq-5.15.8/bin目录,启动activemq./activemq start

输出以上信息,表示启动成功。

2.安装遇到的问题

在安装过程当中,经过查看activemq的运行状态,

显示以上。

经过./bin/activemq console 命令查看运行日志:

主机名中包含非法字符;

那么解决办法就很简单了,改主机名:

一、方法一使用hostnamectl命令

hostnamectl set-hostname 主机名

二、方法二:修改配置文件 /etc/hostname 保存退出

修改完成以后重启便可,这里我使用的是方法一:

hostnamectl set-hostname activemq

查看运行状态:

5、ActiveMQ页面介绍

待ActiveMQ安装启动好,访问http://ip:8161/admin,登陆名和密码都是admin(在配置文件中可修改),进入ActiveMQ的主页:

下面来介绍每一个菜单的功能:

1.Queue消息队列页面

Name:消息队列的名称。

Number Of Pending Messages:未被消费的消息数目。

Number Of Consumers:消费者的数量。

Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。

Messages Dequeued:出了队列的消息,能够理解为是被消费掉的消息数量。在Queues里它和进入队列的总数量相等(由于一个消息只会被成功消费一次),若是暂时不等是由于消费者还没来得及消费。

2.Topic主题页面

Name:主题名称。

Number Of Pending Messages:未被消费的消息数目。

Number Of Consumers:消费者的数量。

Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。

Messages Dequeued:出了队列的消息,能够理解为是被消费掉的消息数量。在Topics里,由于多消费者从而致使数量会比入队列数高。

3.Subscribers查看订阅者页面

查看订阅者信息,只在Topics消息类型中这个页面才会有数据。

4.Connections查看链接数页面

6、简单使用

引入jar包:

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

1.点对点(P2P)模型

​ 点对点模型,采用的是队列(Queue)做为消息载体。在该模式中,一条消息只能被一个消费者消费,没有被消费的,只能留在队列中,等待被消费,或者超时。举个例子,若是队列中有10条消息,有两个消费者,就是一个消费者消费5条信息,你一条我一条。如下以代码演示。

消息发布者:

public static void main(String[] args) throws JMSException {
    /*
     * 实现步骤
     * 1.创建ConnectionFactory工厂对象,须要填入用户名、密码、链接地址(通常使用默认,若是没有修改的话)
     * 2.经过ConnectionFactory对象建立一个Connection链接,而且调用Connection的start方法开启链接,Connection方法默认是关闭的
     * 3.经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数1是是否启用事物,参数2是签收模式,通常设置为自动签收
     * 4.经过Session对象建立Destination对象,指的是一个客户端用来制定生产消息目标和消费消息来源的对象。在PTP的模式中,Destination被称做队列,在Pub/Sub模式中,Destination被称做主题(Topic)
     * 5.经过Session对象建立消息的发送和接收对象(生产者和消费者)
     * 6.经过MessageProducer的setDeliverMode方法为其设置持久化或者非持久化特性
     * 7.使用JMS规范的TextMessage形式建立数据(经过Session对象),并用MessageProducer的send方法发送数据。客户端同理。记得关闭
     */
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
            ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("queue");
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    for (int i=0;i<=5;i++) {
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("我是第"+i+"消息");
        producer.send(textMessage);
    }
    if(connection!=null){
        connection.close();
    }
}

消息消费者:

public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("queue");
        MessageConsumer consumer = session.createConsumer(destination);
        while (true){
            TextMessage message = (TextMessage) consumer.receive();
            if (message==null){
                break;
            }
            System.out.println(message.getText());
        }
        if(connection!=null){
            connection.close();
        }
    }

先启动两个消费者,在启动发布者:

2.发布/订阅(Pub/Sub)模型

发布/订阅模型采用的是主题(Topic)做为消息通信载体。该模式相似微信公众号的模式。发布者发布一条信息,而后将该信息传递给全部的订阅者。注意:订阅者想要接收到该信息,必须在该信息发布以前订阅。

发布者发布信息:

public static void main(String[] args) throws JMSException, IOException {
        // 建立一个ConnectionFactory对象链接MQ服务器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616");
        // 建立一个链接对象
        Connection connection;
        connection = connectionFactory.createConnection();
        // 开启链接
        connection.start();
        // 使用Connection对象建立一个Session对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 建立一个Destination对象。topic对象
        Topic topic = session.createTopic("test-topic");
        // 使用Session对象建立一个消费者对象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // 打印结果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println("这是接收到的消息:" + text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
        System.out.println("topic消费者启动。。。。");
        // 等待接收消息
        System.in.read();
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

订阅者订阅信息:

public static void main(String[] args) throws JMSException {
        // 一、建立一个链接工厂对象,须要指定服务的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616");
        // 二、使用工厂对象建立一个Connection对象。
        Connection connection = connectionFactory.createConnection();
        // 三、开启链接,调用Connection对象的start方法。
        connection.start();
        // 四、建立一个Session对象。
        // 第一个参数:是否开启事务。若是true开启事务,第二个参数无心义。通常不开启事务false。
        // 第二个参数:应答模式。自动应答或者手动应答。通常自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 五、使用Session对象建立一个Destination对象。两种形式queue、topic,如今应该使用topic
        Topic topic = session.createTopic("test-topic");
        // 六、使用Session对象建立一个Producer对象。
        MessageProducer producer = session.createProducer(topic);
        // 七、建立一个Message对象,可使用TextMessage。
        for (int i = 0; i < 50; i++) {
            TextMessage textMessage = session.createTextMessage("第" + i + "一个ActiveMQ队列目的地的消息");
            // 八、发送消息
            producer.send(textMessage);
        }
        // 九、关闭资源
        producer.close();
        session.close();
        connection.close();
    }

订阅者要提早订阅,因此先运行订阅者。

3.两种模式对比

1)由以上,咱们能够总结出ActiveMQ的实现步骤:

  • 创建ConnectionFactory工厂对象,须要填入用户名、密码、链接地址
  • 经过ConnectionFactory对象建立一个Connection链接
  • 经过Connection对象建立Session会话
  • 经过Session对象建立Destination对象;在P2P的模式中,Destination被称做队列(Queue),在Pub/Sub模式中,Destination被称做主题(Topic)
  • 经过Session对象建立消息的发送和接收对象
  • 发送消息
  • 关闭资源

2)能够看出,P2P模式和Pub/Sub模式,在实现上的区别是经过Session建立的Destination对象不同,在P2P的模式中,Destination被称做队列(Queue),在Pub/Sub模式中,Destination被称做主题(Topic)

7、参考

  1. https://www.jianshu.com/p/0363ac9ff574
  2. https://juejin.im/post/5adaaae351882567356415eb
相关文章
相关标签/搜索