消息队列的实现原理和ActiveMQ详解

1、链式调用

在咱们平常的项目开发过程当中,通常各模块或者函数方法之间,都是采用链式调用的方式,为了完成一个总体功能,咱们会将其拆分红多个函数(或者子模块),好比模块A调用模块B,模块B调用模块C,模块C调用模块D。但在大型分布式应用中,系统间的RPC(远程过程调用(Remote Procedure Call)的缩写形式) 交互繁杂,一个功能背后要调用上百个接口并不是不可能,这种架构就有以下几个劣势:前端

  1. 接口之间耦合比较严重
    每新增一个下游功能,都要对上游的相关接口进行改造;举个例子:假如系统A要发送数据给系统B和C,发送给每一个系统的数据可能有差别,所以系统A对要发送给每一个系统的数据进行了组装,而后逐一发送;当代码上线后,新增了一个需求:把数据也发送给D。此时就须要修改A系统,让他感知到D的存在,同时把数据处理好给D。在这个过程当中你会看到,每接入一个下游系统,都要对A系统进行代码改造,开发联调的效率很低。其总体架构以下图:
    在这里插入图片描述
  2. 面对大流量并发时,容易被冲垮
    每一个接口模块的吞吐能力是有限的,这个上限能力若是堤坝,当大流量(洪水)来临时,容易被冲垮。
  3. 存在性能问题
    RPC接口基本上是同步调用,总体的服务性能遵循“木桶理论”,即链路中最慢的那个接口。好比A调用B/C/D都是50ms,但此时B又调用了B1,花费2000ms,那么直接就拖累了整个服务性能。
    在这里插入图片描述

2、解决链式调用的劣势

根据上述的几个问题,那咱们在设计系统时须要明确一下要达到的目标:java

  1. 要作到系统解耦
    也就是当有新的模块接入到咱们的项目中时,能够作到代码改动最小;
  2. 设置流量缓冲池
    可让后端系统按照自身吞吐能力进行消费,而不被冲垮,当洪水来临时,有一个大的水库为咱们进行蓄水缓冲,以此来减轻下游的压力。
  3. 强弱依赖梳理,将非关键调用链路的操做异步化,提高总体系统的吞吐能力
    好比上图中A、B、C、D是让用户发起付款,而后返回付款成功提示的几个关键流程,而B1是通知付款后通知商家发货的模块,那么实质上用户对B1完成的时间容忍度比较大(好比几秒以后),能够将其异步化。也就是只要用户支付成功,我就直接返回"支付成功"的页面,而采用异步调用的方式来将发货通知发送给商家,这样对于用户来讲,就不须要等待后台代码给商家发送消息的时间,而是在支付成功后,直接得到结果。

3、MQ消息队列概念的引入

在如今的系统视线中,MQ消息队列是广泛使用的,能够完美的解决上面提到的问题。下图是使用了MQ的简单架构图,能够看到MQ在最前端对流量进行蓄洪,下游的系统ABC只与MQ打交道,经过事先定义好的消息格式来解析。
在这里插入图片描述
引入MQ以后的系统架构、交互方式与最初的链式调用架构很是不一样,虽然能够解决上文提到的问题,但也要充分理解其原理特性来避免其带来的反作用,这里以消息队列如何保证“消息的可靠投递”为切入点,来看看MQ的实现方式。linux

4、MQ的实现流程概述

- Client如何将消息可靠投递到MQ

  1. Client发送消息给MQ
  2. MQ将消息持久化后,发送Ack消息给Client,此处有可能由于网络问题致使Ack消息没法发送到Client,那么Client在等待超时后,会重传消息;
  3. Client收到Ack消息后,认为消息已经投递成功。

- MQ如何将消息可靠投递到Client

  1. MQ将消息push给Client(或Client来pull消息)
  2. Client获得消息并作完业务逻辑
  3. Client发送Ack消息给MQ,通知MQ删除该消息,此处有可能由于网络问题致使Ack失败,那么Client会重复消息,这里就引出消费幂等的问题;
  4. MQ将已消费的消息删除

大致上的流程就是如此,可是暂时不展开具体的描述,由于仅有MQ这种实现方式的思想还不够,因为系统模块间存在着异步的交互,因此,咱们不得不引入一个关于异步交互的知识点——JMSweb

5、JMS

- 什么是JMS

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。apache

JMS容许应用程序组件基于JavaEE平台建立、发送、接收和读取消息。它使分布式通讯耦合度更低,消息服务更加可靠以及异步性。windows

- JMS术语

提到JMS,就会引出它内部的一些组件或者说对象,那就避免不了有一些术语,作一下解释:后端

  1. 消息中间件(JMS Provider): 指提供了对JMS协议实现的第三方组件,好比ActiveMQ就是一个消息中间件,另外比较知名的还有KFA, Rabbit MQ等。
    通俗的来讲,消息中间件,就是那个洪水来了,蓄洪缓冲的大水库,用来存放上游发来的消息。
  2. 消息模式:分为点对点(Point to Point,即P2P)和发布/订阅(Pub/Sub),对应的数据结构分别是队列(Queue)和主题(Topic)。后面会详细提到。
  3. 消息(Message): 通讯内容的载体,其结构主要分为消息头,属性和消息体,而且根据存储结构的不一样分为好几种,后面会详细提到。
  4. 消息生产者:产生消息的一方,在P2P模式下,指消息发送者(Sender),在P/S模式下指消息发布者(Publisher)
  5. 消息消费者:接收消息的一方,对应于两种模式分别是消息接收者(Receiver)和消息订阅者(Subscriber)

那么接下来,就先详细的介绍一下,JMS的相关内容,在介绍完JMS后,咱们再来研究对JMS这个接口具体实现的提供者,好比:Apache ActiveMQ
可能会有同窗存在疑问,为何要介绍JMS呢,其实JMS只是一套Java的标准,也就是接口,它没有具体的实现类,若是咱们想要使用,用到确定不是接口,而是具体实现。可是咱们在使用具体的JMS提供者前,先搞清楚这个提供者到底实现了哪些东西,这样再去学习具体的实现产品,就垂手可得了。服务器

- JMS基本概念及原理详解

① 基本概念

JMS是Java的消息服务,JMS的客户端之间能够经过JMS服务进行异步的消息传输。网络

② 体系架构

JMS由如下元素组成。session

  1. JMS提供者
    链接面向消息中间件的,JMS接口的一个实现。提供者能够是Java平台的JMS实现,也能够是非Java平台的面向消息中间件的适配器。

  2. JMS客户
    生产或消费消息的基于Java的应用程序或对象。

  3. JMS生产者
    建立并发送消息的JMS客户。

  4. JMS消费者
    接收消息的JMS客户。

  5. JMS消息
    包括能够在JMS客户之间传递的数据的对象

  6. JMS队列
    一个容纳那些被发送的等待阅读的消息的区域。队列暗示,这些消息将按照顺序发送。一旦一个消息被阅读,该消息将被从队列中移走。

  7. JMS主题
    一种支持发送消息给多个订阅者的机制。

③ JMS消息模型(即点对点和发布订阅模型)

  • Point-to-Point(P2P)

  • Publish/Subscribe(Pub/Sub)

P2P
  • P2P模式图效果

在这里插入图片描述

  • 在P2P模式中,涉及到的概念:
  1. 消息队列(Queue)
  2. 提供者(Sender)
  3. 消费者(Receiver)
  4. 每一个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
  • P2P的特色:
  1. 每一个消息只有一个消费者(Consumer)(即一旦被消费,消息就再也不在消息队列中) 【一对一的关系】
  2. 提供者和消费者之间在时间上没有依赖性,也就是说当提供者发送了消息以后,无论消费者有没有正在运行,它不会影响到消息被发送到队列
  3. 每条消息仅会传送给一个消费者。可能会有多个消费者在一个队列中侦听,可是每一个队列中的消息只能被队列中的一个消费者所消费。
  4. 消息存在前后顺序。一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者。当已被消费时,就会从队列头部将它们删除(除非使用了消息优先级)。
  5. 消费者在成功接收消息以后需向队列应答成功(签收模式能够为自动签收或者手动签收)

若是你但愿发送的每一个消息都应该被成功处理的话,那么你须要P2P模式。

Pub/Sub(发布/订阅模式)
  • Pub/Sub模式效果图
    在这里插入图片描述
  • 涉及到的概念:
  1. 主题(Topic)
  2. 发布者(Publisher)
  3. 订阅者(Subscriber)

客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

  • Pub/Sub(发布/订阅模式)的特色
  1. 每一个消息能够有多个消费者 【一对多的关系】
  2. 发布者和订阅者之间有时间上的依赖性。针对某个主题的订阅者,它必须建立一个订阅者以后,才能消费发布者的消息,并且为了消费消息,订阅者必须保持运行的状态。
  3. 为了缓和这样严格的时间相关性,JMS容许订阅者建立一个可持久化的订阅。这样,即便订阅者没有被激活(运行),它也能接收到发布者的消息。
  4. 每条消息都会传送给称为订阅者的多个消息消费者。订阅者有许多类型,包括持久型、非持久型和动态型。
  5. 发布者一般不会知道、也意识不到哪个订阅者正在接收主题消息。
  6. 消息被推送给消费者,这意味着消息会传送给消费者,而无须请求。

若是你但愿发送的消息能够不被作任何处理、或者被一个消息者处理、或者能够被多个消费者处理的话,那么能够采用Pub/Sub模型。

④ 关于消息的消费

在JMS中,消息的产生和消息是异步的。对于消费来讲,JMS的消息者能够经过两种方式来消费消息。

  • 同步
    订阅者或消费者调用receive方法来接收消息,receive方法在可以接收到消息以前(或超时以前)将一直阻塞。
  • 异步
    订阅者或消费者能够注册为一个消息监听器。当消息到达以后,系统自动调用监听器的onMessage方法。

⑤ JMS应用程序接口

  1. ConnectionFactory 接口(链接工厂)
    建立Connection对象的工厂,根据消息类型的不一样,用户将使用队列链接工厂或者主题链接工厂。
    分别有QueueConnectionFactory和TopicConnectionFactory两种。能够经过JNDI来查找ConnectionFactory对象。

  2. Destination 接口(目标)
    Destination是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。是消息生产者的消息发送目标或者说消息消费者的消息来源。
    对于消息生产者来讲,它的Destination是某个队列(Queue)或某个主题(Topic);
    对于消息消费者来讲,它的Destination也是某个队列或主题(即消息来源)。
    因此,Destination实际上就是两种类型的对象:Queue、Topic能够经过JNDI来查找Destination。

  3. Connection 接口(链接)
    Connection表示在客户端和JMS系统之间创建的连接(对TCP/IP socket的包装)。
    Connection能够产生一个或多个Session。跟ConnectionFactory同样,Connection也有两种类型:QueueConnection和TopicConnection。

  4. Session 接口(会话)
    Session是咱们操做消息的接口。表示一个单线程的上下文,用于发送和接收消息。
    因为会话是单线程的,因此消息是连续的,就是说消息是按照发送的顺序一个一个接收的。
    能够经过session建立生产者、消费者、消息等。Session提供了事务的功能。当咱们须要使用session发送/接收多个消息时,能够将这些发送/接收动做放到一个事务中。
    一样,也分QueueSession和TopicSession。

  5. MessageProducer 接口(消息的生产者)
    消息生产者由Session建立,并用于将消息发送到Destination。消费者能够同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
    一样,消息生产者分两种类型:QueueSender和TopicPublisher。能够调用消息生产者的方法(send或publish方法)发送消息。

  6. MessageConsumer 接口(消息消费者)
    消息消费者由Session建立,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。
    可分别经过session的createReceiver(Queue)或createSubscriber(Topic)来建立。
    固然,也能够session的creatDurableSubscriber方法来建立持久化的订阅者。

  7. Message 接口(消息)
    是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另外一个应用程序。一个消息有三个主要部分:
    一、消息头(必须):包含用于识别和为消息寻找路由的操做设置。
    二、一组消息属性(可选):包含额外的属性,支持其余提供者和用户的兼容。能够建立定制的字段和过滤器(消息选择器)。
    三、一个消息体(可选):容许用户建立五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。消息接口很是灵活,并提供了许多方式来定制消息的内容。
    消息接口很是灵活,并提供了许多方式来定制消息的内容。

  8. MessageListener
    消息监听器。若是注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
    EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

6、ActiveMQ入门介绍

经过上面的介绍,若是要使用Java消息服务,咱们就必需要有一个JMS提供者,来管理会话和队列。如今既有开源的提供者也有专有的提供者。
开源的提供者包括:Apache ActiveMQ、Kafka、WebMethods、阿里的RocketMQ等。Kafka和RocketMQ已经贡献给了Apache项目基金会,目前是属于Apache的了。
如今,就来介绍一下,ActiceMQ,它是Apache开源的消息服务器;
官方网站:activemq.apache.org

- ActiveMQ的特色

  1. 支持多语言协议和客户端,如JAVA、C、C++、C#, Ruby, Perl, Python, PHP等
  2. 支持许多高级特性,如消息分组、虚拟目的地、通配符、组合目的地;
  3. 彻底支持JMS1.1和J2EE1.4;
  4. ActiveMQ能够很容易的嵌入到Spring应用中;
  5. 经过了J2EE服务器的测试,如JBOSS、weblogic等
  6. 支持一些可插拔传输协议:如IN-VM、TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports
  7. 支持JDBC的消息持久化
  8. 支持集群

- ActiveMQ的安装

windows用户在activemq安装目录的bin/win64/activemq.bat
mac\linux:activemq安装目录的bin ./activemq start
访问地址:localhost:8161/admin
用户名/密码:admin/admin

7、具体应用ActiveMQ

引入maven依赖

<dependency>
  		<groupId>org.apache.activemq</groupId>
  		<artifactId>activemq-all</artifactId>
  		<version>5.15.3</version>
  	</dependency>

首先,介绍P2P模式下的生产者和消费者代码如何书写。

生产者开发步骤:
  1. 建立链接工厂
  2. 建立Connection并调用start()方法
  3. 经过Connection建立Session
  4. 经过Session建立Definition(Queue/Topic)
  5. 经过Session建立生产者
  6. 经过Session建立消息
  7. 使用生产者对象发送消息
  8. 关闭全部资源

以名为Hello的项目为例子,简单用代码实现一下整个过程,主要是生产者发送一条Hello ActiveMQ 消息到消息中间件,而后消费者经过访问消息中间件,得到这条消息。

生产者开发代码:
package com.golden3young.p2p.hello;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloProducer {

	public static void main(String[] args) throws JMSException {
		
		//1.建立链接工厂
		ConnectionFactory factory = new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616");
		
		//2.建立Connection 并调用它的start()
		Connection connection = factory.createConnection();
		connection.start();
		
		//3.经过Connection建立session
		//方法参数(是否开启事务,消息签收方式:自动签收/手动签收)
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		
		//4.经过Session建立Destination(Queue/Topic)
		//方法参数(队列名称)
		Queue queue = session.createQueue("hello");
		
		//5.经过Session建立生产者
		MessageProducer producer = session.createProducer(queue);
		
		//6.经过Sessioin建立消息
		TextMessage msg = session.createTextMessage("Hello ActiveMQ");
		
		//7.使用生产者对象发送消息
		producer.send(msg);
		
		//8.把资源关闭
		producer.close();
		session.close();
		connection.close();

	}

}
消费者开发步骤:
  1. 建立链接工厂
  2. 建立Connection并调用start()方法
  3. 建立Session
  4. 经过Session建立Definition
  5. 经过Session建立消费者
  6. 使用消费者接收消息 / 设置消息监听器
    这里咱们能够本身去接收消息,也能够经过建立消息监听器来自动监听中间件消息中消息的动态变化,具体的操做须要在监听器中实现,在消费者中引用。或者是直接在消费者中建立一个监听器便可。总之就是这个消费者若是须要监听器,要么引用要么本身建立。
消费者开发代码:
package com.golden3young.p2p.hello;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloConsumer {
	public static void main(String[] args) throws JMSException {
		//建立ConnectionFactory
		ConnectionFactory factory = new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616");
	
		//建立Connection 并 调用 start()方法
		Connection connection = factory.createConnection();
		connection.start();
		
		//建立Session
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		
		//建立Destination - Quene
		//两个队列的名字必须彻底一致
		Queue queue = session.createQueue("hello");
		
		//建立消费者MessageConsumer
		MessageConsumer consumer = session.createConsumer(queue);
		
		//使用消费者接收消息
		TextMessage msg = (TextMessage)consumer.receive();
		System.out.println(msg);
		
	}
}

若是是以监听器的方式来获取消息,那么须要引用或者建立。下面是引用的代码:

既然是引用,就首先得建立一个监听器,代码:

package com.etoak.golden3young.hello;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class HelloListener implements MessageListener {

	@Override
	public void onMessage(Message message) {
		if(message instanceof TextMessage) {
			TextMessage text = (TextMessage) message;
			try {
				System.out.println(text.getText().toString());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

建立完成后,在消费者类中,进行引用:

// 设置消息监听器 只要队列中生产者生产了一条消息,当前消费者就会监听到是不是本身想要的类型,若是是,就打印。
	consumer.setMessageListener(new HelloListener());

接下来,介绍Pub/Sub模式下的生产者和消费者代码如何书写。

生产者发开代码:
package com.golden3young.topic;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicProducer {

	public static void main(String[] args) throws JMSException {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616");
		Connection connection = factory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//建立Destination
		Topic topic = session.createTopic("topic");
		//事实上应该是 Publisher发布者, 可是API中仍是叫producer
		MessageProducer producer = session.createProducer(topic);
		TextMessage msg = session.createTextMessage("Hello I am Topic ActiveMQ!");
		producer.send(msg);
		
		producer.close();
		session.close();
		connection.close();
	}
}
消费者开发代码:
package com.golden3young.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicConsumer {
	public static void main(String[] args) throws JMSException {
		ConnectionFactory factory = new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616");
		Connection connection = factory.createConnection();
		connection.start();
		Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic("topic");
		// 事实上应该是 subscriber 订阅者,可是API里仍是叫consumer
		MessageConsumer consumer = session.createConsumer(topic);
		//直接在本类中建立了一个监听器,而没再单独引用。
		consumer.setMessageListener(new MessageListener() {

			@Override
			public void onMessage(Message message) {
				TextMessage text = (TextMessage) message;
				try {
					System.out.println(text.getText().toString());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
	}
}