MQ介绍 & 实例

阅读目录php

 

定义:html

消息队列(MQ)是一种应用程序对应用程序的通讯方法,应用程序经过队列进行通讯,而不是经过直接调用彼此来通讯,队列的使用除去了接收和发送应用程序同时执行的要求。是进行通讯的中间件产品。(换言之:MQ负责两个系统之间传递消息,这两个系统能够是异构的,处于不一样硬件、不一样操做系统、用不一样语言编写,只须要简单的调用几个MQ的API,就能够互相通信,你没必要考虑底层系统和网络的复杂性。MQ可以应付多种异常状况,例如网络阻塞、临时中断等等)java

PS:直接调用一般是用于诸如远程过程调用的技术。python

补充知识:MB(消息路由、数据转换)web

优秀MQ特色spring

  a>.高可用性,但愿MQ能支撑7x24小时应用,而不是三天两头当机,要作到高可用性,就须要作MQ的集群,一台当了,不影响整个集群的服务能力,这里涉及到告警、流控、消息的负载均衡、数据库的使用、测试的完备程度等等。数据库

  b>.消息存储的高可靠性。要保证100%不丢消息。这不只仅是MQ的责任,更涉及到硬件、操做系统、语言平台和数据库的一整套方案。许多号称可靠存储的MQ产品其实都不可靠,要知道,硬件错误是常态,若是在硬件错误的状况下还能保证消息的可靠存储这才是难题。这里可能须要用到特殊的存储硬件,特殊的数据库,分布式的数据存储,数据库的分库分表和同步等等。你要考虑消息存储在哪里,是文件系统,仍是数据库,是本地文件,仍是分布式文 件,是搞主辅备份呢仍是多主机写入等等。apache

  c>.高可扩展性,MQ集群能很好地支持水平扩展,这就要求咱们的节点之间最好不要有通讯和数据同步。编程

  d>.性能,性能是实现高可用性的前提,很难想象单机性能极差的MQ组成的集群能在高负载下幸免于难。性能因素跟采用的平台、语言、操做系统、代码质量、数据 库、网络息息相关。MQ产品的核心实际上是消息的存储,在保证存储安全的前提下如何保证和提升消息入队的效率是性能的关键因素。这里须要开发人员创建起性能观念,不须要你对一行行代码斤斤计较,可是你须要知道这样作会形成什么后果,有没有更好更快的方式,你怎么证实它更好更快。软件实现的性能是一方面,另外一 方面就是平台相关了,由于MQ本质上是IO密集型的系统,瓶颈在IO,如何优化网络IO、文件IO这须要专门的知识。性能另外一个相关因素是消息的调度上, 引入消息顺序和消息优先级,容许消息的延迟发送,都将增大消息发送调度的复杂性,如何保证高负载下的调度也是要特别注意的地方。api

  e>.高可配置性和监控工具的完整,这是一个MQ产品容易忽略的地方。异步通讯形成了查找问题的难度,不像同步调用那样有相对明确的时序关系。所以查找异步通讯 的异常是很困难的,这就须要MQ提供方便的DEBUG工具,查找分析日志的工具,查看消息生命周期的工具,查看系统间依赖关系的工具等等。可定制也是MQ 产品很是重要的一方面,可方便地配置各种参数并在集群中同步,而且可动态调整各种参数,这将大大下降维护难度。

Ps:一句话总结:全天候不宕机,安全消息存储,100%不丢失数据高效率的写入读出,同时要求方便查错

产品比较 

RabbitMQ

是使用Erlang编写的一个开源的消息队列,自己支持不少的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的很是重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送给客户端时先在中 心队列排队。对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。

Ps: 结合erlang语言自己的并发优点,性能较好,可是不利于作二次开发和维护

Redis

是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它自己支持MQ功能,因此彻底可 以当作一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操做,各执行100万次,每10万次记录一次执行时间。测试数据分为 128Bytes、512Bytes、1K和10K四个不一样大小的数据。实验代表:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如 果数据大小超过了10K,Redis则慢的没法忍受;出队时,不管数据大小,Redis都表现出很是好的性能,而RabbitMQ的出队性能则远低于 Redis。

 

                     入队

                     出队

 

128B

512B

1K

10K

128B

512B

1K

10K

Redis

16088

15961

17094

25

15955

20449

18098

9355

RabbitMQ

10627

9916

9370

2366

3219

3174

2982

1588

Ps: 作为一个基于内存的K-V数据库,其提供了消息订阅的服务,能够看成MQ来使用,目前应用案例较少,且不方便扩展

ZeroMQ

号称最快的消息队列系统,尤为针对大吞吐量的需求场景。ZMQ可以实现RabbitMQ不擅长的高级/复杂的队列,可是开发人员须要本身组合多种技 术框架,技术上的复杂度是对这MQ可以应用成功的挑战。ZeroMQ具备一个独特的非中间件的模式,你不须要安装和运行一个消息服务器或中间件,由于你的 应用程序将扮演了这个服务角色。你只须要简单的引用ZeroMQ程序库,可使用NuGet安装,而后你就能够愉快的在应用程序之间发送消息了。可是 ZeroMQ仅提供非持久性的队列,也就是说若是down机,数据将会丢失。其中,Twitter的Storm中使用ZeroMQ做为数据流的传输。

Ps: 扩展性好,开发比较灵活,采用C语言实现,实际上他只是一个socket库的从新封装,若是咱们作为消息队列使用,须要开发大量的代码

ActiveMQ

是Apache下的一个子项目。 相似于ZeroMQ,它可以以代理人和点对点的技术实现队列。同时相似于RabbitMQ,它少许代码就能够高效地实现高级应用场景。RabbitMQ、 ZeroMQ、ActiveMQ均支持经常使用的多种语言客户端 C++、Java、.Net,、Python、 Php、 Ruby等。

Ps: 历史悠久的开源项目,已经在不少产品中获得应用,实现了JMS1.1规范,能够和spring-jms轻松融合,实现了多种协议,不够轻巧(源代码比RocketMQ多).支持持久化到数据库,对队列数较多的状况支持很差.

Jafka/Kafka

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka 之上孵化而来的,即Kafka的一个升级版。具备如下特性:快速持久化,能够在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既能够 达到10W/s的吞吐速率;彻底的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持 Hadoop数据并行加载,对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka经过 Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个很是轻量级的消息系统,除了性能很是好以外,仍是一个工做良好的分布式系统。

RocketMQ

阿里巴巴的MQ中间件,在其多个产品下使用,并可以撑住双十一的大流量,他并无实现JMS规范,使用起来很简单。部署由一个 命名服务(nameserver)和一个代理(broker)组成,nameserver和broker以及producer都支持集群,队列的容量受机器硬盘的限制,队列满后能够支持持久化到硬盘(也能够本身适配代码,将其持久化到NOSQL数据库中),队列满后会影响吞吐量,能够采用主备来保证稳定性,支持回溯消费,能够在broker端进行消息过滤.

其余一些队列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就再也不一一分析。

比较ActiveMQ and RocketMQ

  RocketMQ ActiveMQ   
优先级 须要新建一个特殊队列来接收优先级高的队列,没法实现从0-65535这种细粒度的控制 能够精细控制  
顺序 能够保证严格的消费顺序 没法保证严格的顺序  
持久化 支持 支持  
稳定性 更高    
消息过滤 RocketMQ能够在broker端进行过滤,对于咱们的消息总线,这里能够节省大量的网络传输是否会有消息重发形成的重复消费:RocketMQ能够保证,ActiveMQ没法保证 仅支持在客户端消费的时候进行判断是不是本身须要的消息  
回溯消费 支持 不支持 即从新将某一个时刻以前的消息从新消费一遍
事务 支持 支持  
定时消费 支持 不支持  
消息堆积 更优   就是当缓存消息的内存满了以后的解决方案,一种是丢弃策略,这种不会影响吞吐量,还有一种就是将消息持久化到磁盘,这种会影响吞吐量
客户端不在线 RocketMQ能够在客户端上线后继续将未消费的消息推送到客户端    

比较主流的MQ:

  ActiveMQ RabbitMQ RocketMq ZeroMQ
关注度  
成熟度   成熟 成熟 比较成熟 不成熟
所属社区/公司 Apache  Mozilla Public License Alibaba      
社区活跃度  
文档  
特色   功能齐全,被大量开源项目使用 因为Erlang 语言的并发能力,性能很好    各个环节分布式扩展设计,主从 HA;支持上万个队列;多种消费模式;性能很好 低延时,高性能,最高 43万条消息每秒  
受权方式   开源 开源 开源 开源
开发语言   Java Erlang   Java   C
支持的协议   OpenWire、STOMP、REST、XMPP、AMQP AMQP   本身定义的一套(社区提供JMS--不成熟) TCP、UDP
客户端支持语言   Java、C、C++、Python、PHP、Perl、.net 等  Java、C、C++、Python、 PHP、Perl 等 Java、C++(不成熟)   python、 java、 php、.net 等
持久化   内存、文件、数据库 内存、文件 磁盘文件 在消息发送端保存
事务   支持 不支持 支持 不支持
集群   支持 支持 支持 不支持
负载均衡 支持 支持 支持 不支持
管理界面   通常 无社区有 webconsole   实现
部署方式   独立、嵌入 独立 独立 独立
评价   优势:成熟的产品,已经在不少公司获得应用(非大规模场景)。有较多的文档。各类协议支持较好,有多重语言的成熟的客户端;
缺点:根据其余用户反馈,会出莫名其妙的问题,切会丢失消息。其重心放到activemq6.0 产品—apollo上去了,目前社区不活跃,且对5.x维护较少;
Activemq不适合用于上千个队列的应用场景
优势:因为erlang语言的特性,mq性能较好;管理界面较丰富,在互联网公司也有较大规模的应用;支持amqp系诶,有多中语言且支持amqp的客户端可用
缺点:erlang语言难度较大。集群不支持动态扩展。
优势:模型简单,接口易用(JMS的接口不少场合并不太实用)。在阿里大规模应用。目前支付宝中的余额宝等新兴产品均使用rocketmq。集群规模大概在50 台左右,单日处理消息上百亿;性能很是好,能够大量堆积消息在broker中;支持多种消费,包括集群消费、广播消费等。开发度较活跃,版本更新很快。
缺点:没有在mq核心中去实现JMS等接口,
 

 

 实例(简单的实战)

   ActiveMQ入门实例:

    1.去官方网站下载:http://activemq.apache.org/
    2.运行ActiveMQ 解压缩apache-activemq-5.8.0-bin.zip,而后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。
      启动ActiveMQ之后,登录:http://localhost:8161/admin/,建立一个Queue,命名为FirstQueue。
    3.建立Eclipse项目并运行
    建立java project:ActiveMQ,新建lib文件夹,导入以下Jar包

    activemq-broker-5.8.0.jar 、activemq-client-5.8.0.jar 、geronimo-j2ee-management_1.1_spec-1.0.1.jar 、geronimo-jms_1.1_spec-1.1.1.jar 、slf4j-api-1.6.6.jar

    建立类以下类:
    

 

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.DeliveryMode;  
import javax.jms.Destination;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Sender {  
    private static final int SEND_NUMBER = 5;  
  
    public static void main(String[] args) {  
        // ConnectionFactory :链接工厂,JMS 用它建立链接  
        ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS  
        // Provider 的链接  
        Connection connection = null; // Session: 一个发送或接收消息的线程  
        Session session; // Destination :消息的目的地;消息发送给谁.  
        Destination destination; // MessageProducer:消息发送者  
        MessageProducer producer; // TextMessage message;  
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
        try { // 构造从工厂获得链接对象  
            connection = connectionFactory.createConnection();  
            // 启动  
            connection.start();  
            // 获取操做链接  
            session = connection.createSession(Boolean.TRUE,  
                    Session.AUTO_ACKNOWLEDGE);  
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            // 获得消息生成者【发送者】  
            producer = session.createProducer(destination);  
            // 设置不持久化,此处学习,实际根据项目决定  
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
            // 构造消息,此处写死,项目就是参数,或者方法获取  
            sendMessage(session, producer);  
            session.commit();  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  
  
    public static void sendMessage(Session session, MessageProducer producer)  
            throws Exception {  
        for (int i = 1; i <= SEND_NUMBER; i++) {  
            TextMessage message = session.createTextMessage("ActiveMq 发送的消息"  + i);  
            // 发送消息到目的地方  
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);  
            producer.send(message);  
        }  
    }  
}  

 

 

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.MessageConsumer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
public class Receiver {  
    public static void main(String[] args) {  
        // ConnectionFactory :链接工厂,JMS 用它建立链接  
        ConnectionFactory connectionFactory;  
        // Connection :JMS 客户端到JMS Provider 的链接  
        Connection connection = null;  
        // Session: 一个发送或接收消息的线程  
        Session session;  
        // Destination :消息的目的地;消息发送给谁.  
        Destination destination;  
        // 消费者,消息接收者  
        MessageConsumer consumer;  
        connectionFactory = new ActiveMQConnectionFactory(  
                ActiveMQConnection.DEFAULT_USER,  
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
        try {  
            // 构造从工厂获得链接对象  
            connection = connectionFactory.createConnection();  
            // 启动  
            connection.start();  
            // 获取操做链接  
            session = connection.createSession(Boolean.FALSE,  
                    Session.AUTO_ACKNOWLEDGE);  
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
            destination = session.createQueue("FirstQueue");  
            consumer = session.createConsumer(destination);  
            while (true) {  
                // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s  
                TextMessage message = (TextMessage) consumer.receive(100000);  
                if (null != message) {  
                    System.out.println("收到消息" + message.getText());  
                } else {  
                    break;  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  
}  

 

 

IBM WebSphere MQ介绍安装以及配置服务详解(连接)

 

关于消息队列与分布式的那些事

  消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走。经过消息队列,应用程序可独立地执行--它们不须要知道彼此的位置、或在继续执行前不须要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者须要对异构网络环境下的分布式应用提供有效的通讯手段。为了管理须要共享的信息,对应用提供公共的信息交换机制是重要的。

 

设计分布式应用的方法主要有:

远程过程调用(PRC)--分布式计算环境(DCE)的基础标准成分之一;

对象事务监控(OTM)--基于CORBA的面向对象工业标准与事务处理(TP)监控技术的组合;

消息队列(MessageQueue)--构造分布式应用的松耦合方法。

 


  (a) 分布计算环境/远程过程调用 (DCE/RPC)

 

  RPC是DCE的成分,是一个由开放软件基金会(OSF)发布的应用集成的软件标准。RPC模仿一个程序用函数引用来引用另外一程序的传统程序设计方法,此引用是过程调用的形式,一旦被调用,程序的控制则    转向被调用程序。

 

  在RPC实现时,被调用过程可在本地或远地的另外一系统中驻留并在执行。当被调用程序完成处理输入数据,结果放在过程调用的返回变量中返回到调用程序。RPC完成后程序控制则当即返回到调用程序。所以    RPC模仿子程序的调用/返回结构,它仅提供了Client(调用程序)和Server(被调用过程)间的同步数据交换。

 


  (b) 对象事务监控 (OTM)

 

  基于CORBA的面向对象工业标准与事务处理(TP)监控技术的组合,在CORBA规范中定义了:使用面向对象技术和方法的体系结构;公共的Client/Server程序设计接口;多平台间传输和翻译数据的指导方     针;开发分布式应用接口的语言(IDL)等,并为构造分布的Client/Server应用提供了普遍及一致的模式。

 


  (c) 消息队列 (Message Queue)

 

  消息队列为构造以同步或异步方式实现的分布式应用提供了松耦合方法。消息队列的API调用被嵌入到新的或现存的应用中,经过消息发送到内存或基于磁盘的队列或从它读出而提供信息交换。消息队列可用    在应用中以执行多种功能,好比要求服务、交换信息或异步处理等。

 

中间件是一种独立的系统软件或服务程序,分布式应用系统借助这种软件在不一样的技术之间共享资源,管理计算资源和网络通信。它在计算机系统中是一个关键软件,它能实现应用的互连和互操做性,能保证    系统的安全、可靠、高效的运行。中间件位于用户应用和操做系统及网络软件之间,它为应用提供了公用的通讯手段,而且独立于网络和操做系统。中间件为开发者提供了公用于全部环境的应用程序接口,当    应用程序中嵌入其函数调用,它即可利用其运行的特定操做系统和网络环境的功能,为应用执行通讯功能。

 

若是没有消息中间件完成信息交换,应用开发者为了传输数据,必需要学会如何用网络和操做系统软件的功能,编写相应的应用程序来发送和接收信息,且交换信息没有标准方法,每一个应用必须进行特定的编程从而和多平台、不一样环境下的一个或多个应用通讯。例如,为了实现网络上不一样主机系统间的通讯,将要求具有在网络上如何交换信息的知识(好比用TCP/IP的socket程序设计);为了实现同一主机内不一样进程之间的通信,将要求具有操做系统的消息队列或命名管道(Pipes)等知识。

 

  目前中间件的种类不少,如交易管理中间件(如IBM的CICS)、面向Java应用的Web应用服务器中间件(如IBM的WebSphere Application Server)等,而消息传输中间件(MOM)是其中的一种。它简化了应用之间数据的传输,屏蔽底层异构操做系统和网络平台,提供一致的通信标准和应用开发,确保分布式计算网络环境下可靠的、跨平台的信息传输和数据交换。它基于消息队列的存储-转发机制,并提供特有的异步传输机制,可以基于消息传输和异步事务处理实现应用整合与数据交换。

相关文章
相关标签/搜索