消息队列学习基础

什么是MOM

MOM 就是面向消息中间件(Message-oriented middleware),是用于以分布式应用或系统中的异步、松耦合、可靠、可扩展和安全通讯的一类软件。MOM 的整体思想是它做为消息发送器和消息接收器之间的消息中介,这种中介提供了一个全新水平的松耦合。
前端

MOM思想就是A和B两个应用程序不直接发送消息。以前A和B直接发送消息有不少效率问题,如A发送以后B没有及时接受,那么A就一直再在那里堵塞并发性很差,A必须等B接受完以后有返结果了A才能够结束。而MOM就是为了解决这样的问题,不让A与B之间交互,在A和B之间加一个消息中间件,A把消息放到消息中间上,就能够走了,去作别的事情,B何时来消息中间件取消息A不用知道也不用管。这样就提升了效率提供并发性,等B去走后能够经过状态,通知,回调等方式通知A就能够。市面上实现这种思想的技术有不少,IBM(MQSEVICES)、Microsoft(MSMQ)以及BEA的MessageMQ等。处于百家争鸣阶段都是各自实现各自的,没有统一实现标准。此时SUN为了实现统一标准就出现了JMS统一实现规范。JMS主要有2种消息模型,点到点和发布订阅两种。数据库

什么是消息队列

消息队列是在消息的传输过程当中保存消息的容器,用于接收消息并以文件的方式存储,一个消息队列能够被一个也能够被多个消费者消费。编程

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺乏的中间件。安全

目前在生产环境,使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。bash

消息队列优势

  1. 将数据从一个应用程序传到另外一个应用程序,或者从软件的一个模块传送到另一个模块
  2. 负责创建网络通讯的通道,进行数据的可靠传送
  3. 保证数据不重发,不丢失
  4. 可以实现跨平台操做,可以为不一样操做系统上的软件集成技工数据传送服务

消息队列的应用场景

下面详细介绍一下消息队列在实际应用中经常使用的使用场景。场景分为异步处理、应用解耦、流量削锋和消息通信四个场景。
服务器

异步处理

场景说明 用户注册后,须要发送注册邮件和发送注册信息,传统的作法有两种:串行方式并行方式网络

串行方式session

将注册信息写入数据库成功后,发送注册邮件,而后发送注册短信,而全部任务执行完成后,返回信息给客户端
架构


并行方式并发

将注册信息写入数据库成功后,同时进行发送注册邮件和发送注册短信的操做。而全部任务执行完成后,返回信息给客户端。同串行方式相比,并行方式能够提升执行效率,减小执行时间。

上面的比较能够发现,假设三个操做均须要50ms的执行时间,排除网络因素,则最终执行完成,串行方式须要150ms,而并行方式须要100ms。

由于cpu在单位时间内处理的请求数量是一致的,假设:CPU每1秒吞吐量是100此,则串行方式1秒内可执行的请求量为1000/150,不到7次;并行方式1秒内可执行的请求量为1000/100,为10次。

由上能够看出,传统串行和并行的方式会受到系统性能的局限,那么如何解决这个问题?
咱们须要引入消息队列,将不是必须的业务逻辑,异步进行处理,由此改造出来的流程为


根据上述的流程,用户的响应的时间基本至关于将数据写入数据库的时间,发送注册邮件,发送注册短信的消息在写入消息队列后,便可返回执行结果,写入消息队列的时间很快,几乎能够忽略,也有此能够将系统吞吐量提高至20QPS,比串行方式提高近3倍,比并行方式提高2倍。

应用解耦

场景说明 用户下单后,订单系统须要通知库存系统。

传统的作法为:订单系统调用库存系统的接口。以下图所示:


传统方式具备以下缺点:
  1. 假设库存系统访问失败,则订单减小库存失败,致使订单建立失败
  2. 订单系统同库存系统过分耦合

如何解决上述的缺点呢?须要引入消息队列,引入消息队列后的架构以下图所示:


引入消息队列,实现应用解耦
  • 订单系统:用户下单后,订单系统进行数据持久化处理,而后将消息写入消息队列,返回订单建立成功
  • 库存系统:使用拉/推的方式,获取下单信息,库存系统根据订单信息,进行库存操做。

假如在下单时库存系统不能正常使用。也不影响正常下单,由于下单后,订单系统写入消息队列就再也不关心其后续操做了。由此实现了订单系统与库存系统的应用解耦。

流量削锋

流量削峰 也是消息对列中的经常使用场景,通常在秒杀或团抢活动中使用普遍。

应用场景 秒杀活动,通常会由于流量过大,致使流量暴增,应用挂掉。为解决这个问题,通常须要在应用前端加入消息队列。

  1. 能够控制参与活动的人数;
  2. 能够缓解短期内高流量对应用的巨大压力;

流量削锋处理方式系统图以下:


流量削锋方式系统图
  1. 服务器在接收到用户请求后,首先写入消息队列。这时若是消息队列中消息数量超过最大数量,则直接拒绝用户请求或返回跳转到错误页面;
  2. 秒杀业务根据秒杀规则读取消息队列中的请求信息,进行后续处理。

日志处理

日志处理是指将消息队列用在日志处理中,好比Kafka的应用,解决大量日志传输的问题。

日志处理是指将消息队列用在日志处理中,好比Kafka的应用,解决大量日志传输的问题。架构简化以下:


消息队列应用于日志处理的架构
  • 日志采集客户端:负责日志数据采集,定时写受写入Kafka队列;
  • Kafka消息队列:负责日志数据的接收,存储和转发;
  • 日志处理应用:订阅并消费kafka队列中的日志数据;

这种架构在实际开发中的应用,能够参照案例:新浪技术分享:咱们如何扛下32亿条实时日志的分析处理

服务的技术架构设计

  1. Kafka:接收用户日志的消息队列。
  2. Logstash:作日志解析,统一成JSON输出给Elasticsearch。
  3. Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,经过index组织数据,兼具强大的搜索和统计功能。
  4. Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要缘由。

消息通信

消息通信是指,消息队列通常都内置了高效的通讯机制,所以也能够用在纯的消息通信。好比实现点对点消息队列、聊天室等。

点对点通信

点对点通信架构设计

在点对点通信架构设计中,客户端A和客户端B共用一个消息队列,便可实现消息通信功能。

聊天室通信

聊天室通信架构设计

客户端A、客户端B、直至客户端N订阅同一消息队列,进行消息的发布与接收,便可实现聊天通信方案架构设计。

JMS消息服务

讲消息队列就不得不提JMS。JMS(Java Message Service,Java消息服务) JMS 叫作 Java 消息服务(Java Message Service),是 Java 平台上有关面向 MOM 的技术规范,旨在经过提供标准的产生、发送、接收和处理消息的 API 简化企业应用的开发,相似于 JDBC 和关系型数据库通讯方式的抽象。

API是一个消息服务的标准/规范,容许应用程序组件基于JavaEE平台建立,发送,接收和读取消息。他是分布式通讯耦合度更低,消息服务更加可靠以及异步性。

在EJB架构中,有消息bean能够无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。


经常使用概念

  • Provider:纯 Java 语言编写的 JMS 接口实现(好比 ActiveMQ 就是)
  • Domains:消息传递方式,包括点对点(P2P)、发布/订阅(Pub/Sub)两种
  • Connection factory:客户端使用链接工厂来建立与 JMS provider 的链接
  • Destination:消息被寻址、发送以及接收的对象

消息模型

在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)

P2P 模式


P2P(点对点)模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每一个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,知道他们被消费或者超时。

P2P 消息域使用 queue 做为 Destination,消息能够被同步或异步的发送和接收,每一个消息只会给一个 Consumer 传送一次。Consumer 可使用 MessageConsumer.receive() 同步地接收消息,也能够经过使用MessageConsumer.setMessageListener() 注册一个 MessageListener 实现异步接收。

多个 Consumer 能够注册到同一个 queue 上,但一个消息只能被一个 Consumer 所接收,而后由该 Consumer 来确认消息。而且在这种状况下,Provider 对全部注册的 Consumer 以轮询的方式发送消息。


P2P的特色

  1. 每一个消息只有一个消费者(Consumer)(即一旦被消费,消息就再也不在消息队列中,其余的消费者就不能获得这条消息了。)
  2. 发送者和接收者质检在时间上没有依赖性,也就是说当发送者发送了消息以后,无论接收者有没有正在运行,他不会影响到消息发送到队列。
  3. 消费者必须确认对消息的接收

    收到消息后消费者必须确认消息已被接收,不然JMS服务提供者会认为该消息没有被接收,那么这条消息仍然能够被其余人接收。程序能够自动进行确认,不须要人工干预。

  4. 非持久的消息最多只发送一次

    非持久的消息最多只发送一次,表示消息有可能未被发送,形成未被发送的缘由可能有:

    一、 JMS服务提供者出现宕机等状况,形成非持久信息的丢失

    二、 队列中的消息过时,未被接收

  5.  持久的消息严格发送一次

    咱们能够将比较重要的消息设置为持久化的消息,持久化后的消息不会由于JMS服务提供者的故障或者其余缘由形成消息丢失。

若是但愿发送的每一个消息都会被成功处理的话,那么须要p2p 模式

Pub/Sub模式



包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber)。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub(发布/订阅,Publish/Subscribe)消息域使用 topic 做为 Destination,发布者向 topic 发送消息,订阅者注册接收来自 topic 的消息。发送到 topic 的任何消息都将自动传递给全部订阅者。接收方式(同步和异步)与 P2P 域相同。

除非显式指定,不然 topic 不会为订阅者保留消息。固然,这能够经过持久化(Durable)订阅来实现消息的保存。这种状况下,当订阅者与 Provider 断开时,Provider 会为它存储消息。当持久化订阅者从新链接时,将会受到全部的断连期间未消费的消息。

Pub/Sub的特色

  • 每一个消息均可以有多个(0,1,……)订阅者,每条消息能够有多个消费者,若是报纸和杂志同样,谁订阅了谁均可以得到。

  • 发布者和订阅者之间有时间上的依赖性。订阅者只能消费他们订阅以后出版的消息,针对某个主题(Topic)的订阅者,它必须建立一个订阅者以后,才能消费发布者的消息。这就要求订阅者必须先订阅,生产者再发布。即订阅者必须先运行,再等待生产者的运行,这和点对点类型有所差别。
  • 为了消费消息,订阅者必须保持运行的状态。即订阅者必须保持活动状态等待发布者发布的消息,若是订阅者在发布者发布消息以后才运行,则不能得到先前发布者发布的消息。

为了缓和这样严格的时间相关性,JMS容许订阅者建立一个可持久化的订阅。这样,即便订阅者没有被激活(运行),它也能接收到发布者的消息。
若是但愿发送的消息能够不被作任何处理、或者只被一个消息者处理、或者能够被多个消费者处理的话,那么能够采用Pub/Sub模型。

消息消费

在JMS中,消息的产生和消费都是异步的。对于消费来讲,JMS的消息者能够经过两种方式来小消费消息。

  1. 同步
    订阅者或接收者经过receive方法来接受消息,receive在接收到消息以前(或超时以前)将一直阻塞。
  2. 异步
    订阅者或接收者亦能够注册未一个消息监听器。当消息到达以后,系统自动调用监听器的onMessage的方法。
JDNI:Java命名和目录接口,是一种标准的Java命名系统接口。能够在网络上查找和访问服务。经过指定一个资源名称,该名称对应于数据库或者命名服务中的一个记录,同时返回资源链接创建所必需的信息。

JNDI在JMS中起到查找二号访问发送目标或消息来源的做用。

JMS编程

JMS通用步骤

  • 获取链接工厂
  • 使用链接工厂建立链接
  • 启动链接
  • 从链接建立会话
  • 获取 Destination
  • 建立 Producer,或
    • 建立 Producer
    • 建立 message
  • 建立 Consumer,或发送或接收message发送或接收 message
    • 建立 Consumer
    • 注册消息监听器(可选)
  • 发送或接收 message
  • 关闭资源(connection, session, producer, consumer 等)

JMS编程模型

1.ConnectionFactory

建立Connection对象的工厂,针对两周不一样的JMS消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。能够经过JNDI来查找ConnectionFactory对象。

2.Destination

Destination的意思是消息生产者的消息发送目标或着说消息消费者的消息来源。对于消息生产者来讲。他的Destination是某个队列(queue)或者某个主题(Topic);对于消息消费者来讲,他的Destination也是某个队列或主题(即消息来源)。

因此,Destination实际上就是两种类型的对象:Queue,Topic能够经过JNDI来查找Destination

3.Connection

Connection表示在客户端和JMS系统之间创建的连接(对TCP/IP Socket的包装)。Connection能够产生一个或多个Session。跟ConnectionFactory同样,Connection也有两种类型:QueueConnection和TopicConnection。

4.Session

Session是操做消息的接口。能够经过session建立生产者、消费者、消息等。Session提供了事务的功能。当须要使用session发送/接收多个消息时,能够将这些发送/接收动做放到一个事务中。一样,也分QueueSession和TopicSession。

5.消息的生产者

消息生产者由Session建立,并用于将消息发送到Destination。一样,消息生产者分两种类型:QueueSender和TopicPublisher。能够调用消息生产者的方法(send或publish方法)发送消息。

6.消息消费者

消息消费者由Session建立,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别经过session的createReceiver(Queue)或createSubscriber(Topic)来建立。固然,也能够session的creatDurableSubscriber方法来建立持久化的订阅者。

7. MessageListener

消息监听器。若是注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

深刻学习JMS对掌握JAVA架构、EJB架构有很好的帮助,消息中间件也是大型分布式系统必须的组件。本次分享主要作全局性介绍,具体的深刻须要你们学习,实践,总结,领会。

JMS编程实战

这里拿ActiveMQ 举例

public class JMSDemo {
        ConnectionFactory connectionFactory;
        Connection connection;
        Session session;
        Destination destination;
        MessageProducer producer;
        MessageConsumer consumer;
        Message message;
        boolean useTransaction = false;
        try {
                Context ctx = new InitialContext();
                connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
                //使用ActiveMQ时:connectionFactory = new ActiveMQConnectionFactory(user, password, getOptimizeBrokerUrl(broker));
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("TEST.QUEUE");
                //生产者发送消息
                producer = session.createProducer(destination);
                message = session.createTextMessage("this is a test");

                //消费者同步接收
                consumer = session.createConsumer(destination);
                message = (TextMessage) consumer.receive(1000);
                System.out.println("Received message: " + message);
                //消费者异步接收
                consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                                if (message != null) {
                                        doMessageEvent(message);
                                }
                        }
                });
        } catch (JMSException e) {
                ...
        } finally {
                producer.close();
                session.close();
                connection.close();
        }
}复制代码
相关文章
相关标签/搜索