首发时间:2019-05-16java
介绍
- 消息队列,也能够称为“消息中间件”(中间件是一种独立的系统软件或服务程序,请注意独立二字,它是一个相似mysql服务端的独立运行的程序)
- 消息队列它自己是一个队列结构的信息存储组件,某种程度上它跟redis有点相像,其实主要是用来存储数据,而它的用途主要环绕“消息”和“队列”这两个词,这将在下面讲。
- 常见的消息队列有:ActiveMQ,RabbitMQ,kafka。这些消息队列大多兼容大部分语言。
- 在java行业,可能ActiveMQ,kafka用的会比较多。
- 在python行业,可能RabbitMQ可能会比较多。
消息队列的理解
消息,说明与消息有关;队列,说明是一个先入先出的结构;
消息队列能够用于不一样应用系统之间的信息交换,这里所谓的不一样的应用系统是指具备不一样功能的应用系统,好比一个商城系统和一个短信系统,这两个系统具备不一样的职能。(一般来讲,同一职能的平台须要消息队列的需求要低,此时消息传递功能可使用别的东西来实现)
一个应用往消息队列中存入数据,而另外一个应用从消息队列中获取数据,若是这个数据是有消息传递意义的,那这时候他们就完成了数据通讯。特别的是,它们的数据通讯此时是异步的,由于没有要求数据是何时存入的,因此A存入数据后,B能够挺久以后再去处理。
此外,因为是一个队列,那么保证了先存进来的消息会被先取出,因此这保证了先进来的消息会被先解析。python
举个栗子
一个电商平台可能有发短信的业务需求。在一个普通的架构中,可能会整合了短信接口,那么直接带数据给短信接口就能够发短信了。
mysql
而若是在一个短信业务独立的架构中(短信接口独立多是由于有一些别的业务平台也须要,若是每一个平台都建立新的短信平台,那么就太耗费资源了),此时每一个业务平台都须要向这个独立的平台来发请求来发短信。例如一个公司可能有多个项目都有发短信的需求,那么这时候抽取短信功能到一个独立的项目较好。

而上面会遇到一个问题,就是并发问题,若是一时间有太多的发短信请求可能会对短信平台形成较大的处理压力。
因此这时候就可使用消息队列来解决这个问题。电商业务平台能够把须要发短信的手机号之类的信息存储到消息队列中,而后短信平台能够“慢慢”地从消息队列来获取数据来发短信。这样不会有什么超常的压力而致使短信平台宕机了。(固然,这个时候有有点效率过低了,不过这个是别的架构优化问题,这里不谈)

或许有人还不是很懂这样的好处,那多提一点,因为此时短信平台是从消息队列中获取数据的,其一好处是避免了太高的访问请求压迫短信平台,其二好处是因为短信平台是单独运行的,那其实我能够建立多个短信平台来提升对请求的处理能力,只要它们都遵循从消息队列中取数据便可。web
使用场景
- 流量削锋,做为一个前置组件,帮助后面的关键组件顶住大访问量的压力。【上面的举个栗子就是】
- 异步处理,如你所见,一方只负责向消息队列存消息,存完就能够去干点别的事情了。好比消息队列用在备份平台中,我要备份什么东西告诉你一下就能够了,我不必卡在那里等你备份完毕再去作别的东西的,否则会很浪费时间。
- 应用解耦,在上面的举个栗子中其实也展现了一点应用解耦的功能,如上面把短信平台从电商平台中解耦出来了,并且这时候若是在多个短信平台中的“某个”短信平台挂了也不会影响电商平台的运行。
- 其余。。。【这个东西是能够用来作不少事情的,上面的是常见的几种,甚至还有人用来作最基础的消息通信】
消息队列的模型与概念理解
上面讲了消息队列的做用,下面将讲述消息队列的模型、机制和API规范等这些偏基础理念的东西,了解了这些基础知识你才能更容易去掌握后面的实际使用,固然也会展现部分代码来辅助理解。在下一节才会以使用activeMQ为例来使用消息队列。redis
JMS模型
基本概念:
JMS全称Java Message Service,即Java消息服务,能够说是一种开发规范,一些消息队列的开发要基于这种开发规范,好比说ActiveMQ的API结构要遵循JMS的规范。遵循了这种开发规范后,咱们就能够很轻松地使用各类基于JMS开发规范的消息队列。(JDBC也是一种开发规范)sql
内容:
1.JMS定义了Java中关于消息队列的各类规范与标准。例如关于消息提供者的接口规范、关于存储到消息队列的消息的格式的规范、关于消息消费者的接口规范,关于消息域的规范,关于消息的可靠性的规范等等。
2.JMS定义了JMS的消息由如下三部分组成:【这三个部分的内容下面有讲】
* 消息头:主要是关于消息的传输一些属性,如消息的目的地、消息的发送模式、消息的过时时间。
* 消息属性:消息属性存储的是一些与消息头以外的属性,包括发送者标识这些与消息相关属性,也能够包括一些与消息无关但开发者想同时携带的属性。
* 消息体:存储的是具体的消息数据。
3.JMS定义了JMS Provider和JMS Consumer的建立方式,由Session对象建立,能够用于发送和接收消息。
4.JMS也定义了消息的消费方式:
* 同步消费,即阻塞式监听消费,消费者使用消费者的receive方法阻塞式地等待消息到达。receive方法获得的是消息。
* 异步消费,能够为消费者注册一个消息监听器,消息监听器会监听消息的达到,在消息监听器中能够定义在消息到达时执行的操做。

5.JMS也定义了两种消息传递方式:PTP(点对点) 和pub/sub(发布/订阅)。
* 点对点消息传递是生产者把消息存放到消息队列中,而后消费者从消息队列中取出数据,并且每一条消息都只能被消费一次。
* 这种传递方式中,消息的生产者与消费者之间没有时间上的相关性,消费者能够很迟才来队列中取出消息。没有被消费过的消息会一直存储在消息队列中,直到有人把它消费才会从队列中移除。
*
api
- 发布/订阅模式是生产者把消息存到消息队列中,而后这个消息队列上的“订阅器”会把消息传递给订阅器的所属者。这就好像订报纸,当有了新报纸后,负责你家的发报员会把报纸发给你。
- 这种传递模式中,因为消息是经过“订阅”来发送给消费者的,因此每一个消息能够有多个消费者。
- 订阅一个主题的消费者只能获取在他订阅以后生产者所生产的消息,也就是说你没法接收以前的消息。
- 订阅模式的消息是能够选择是否持久化的。
- 若是是非持久化的消息,那么订阅器的所属者不在线的时候,是没法接收到生产者生产的消息的。
- 若是是持久化的消息,那么消息队列会把消费者标记成不在线,而后等他在线的时候再把消息发给他。
- 在点对点消息传递中,目的地被称为队列(queue);在发布/订阅模式中,目的地被称为主题(topic)。在实际的代码中,使用什么模式来进行消息传递取决你是建立的目的地是什么类型的。
JMS定义的消息结构:
上面说了,JMS定义的消息结构包括三部分:消息头,消息属性,消息体服务器
- 消息头:包含属性以下:
- JMS Destination: 定义消息的发送目的地,由生产者端设置,主要有队列Queue和主题Topic,用于标识把消息发送到指定的队列或主题。
- JMS DeliveryMode: 定义消息的发送模式,由生产者端设置,主要有持久模式和非持久模式,主要用于标识发布/订阅模式下的消息是否持久化。
- JMS Expiration: 消息过时时间。若是值为0,表明永不过时,过时后消息将被自动清除。默认永不过时。
- JMS Priority: 消息优先级,值为0-9,0-4是普通消息,5-9是加急消息,并无严格说2级比1级必定要优先发送,但要求加急的必须优先于普通消息。
- JMS MessageID: 惟一标识每一个消息,由JMS Provider自动生成。
- JMS Timestatmp: 时间戳,是生产者把消息发送到消息队列的时间。
- JMS CorrelationID: 由消费者端设置,用来链接到另外一消息,典型的应用是在回复消息中使用CorrelationID链接到原消息,用来标明传递的消息与哪一个消息有关。
- JMS ReplyTo: 由消费者端设置,值为一个JMS Destination,能够称为消息回复地址,能够用于但愿消费者回传一个消息给生产者的状况。
- JMS Type:消息类型的识别符,标明消息是什么结构的(有普通文本结构,有map结构),由消费者端设置。
- JMS Redelivered:标明是不是第一次发送,若是是,那么值为false,反之true。所谓不是第一次发送,指以前发送过,但没有成功签收消息。
- 消息属性:
- 开发者自我添加的属性:好比除了消息以外,我想让消息携带一些我本身定义的数据就可使用消息属性来作到。【就好像其实在web开发中有时候也把一些数据存储到请求头中】
- JMS定义的属性:这些属性都是以JMSX开头的属性,包括如下属性:
- JMSXUserID: 发送消息的用户标识
- JMSXAppID: 发送消息的应用标识
- JMSXDeliveryCount , JMSXGroupID , JMSXGroupSeq , JMSXProducerTXID , JMSXConsumerTXID , JMSXRevTimestamp , JMSXState
- 【这些属性能够经过api来获取: Enumeration jmsxPropertyNames = connection.getMetaData().getJMSXPropertyNames();】
- JMS供应商特定的属性:这些属性JMS供应商决定,所谓供应商,也就是不一样的消息队列,好比activemq,rabbitmq
- 消息体:JMS 定义了五种消息体格式,可使用不一样形式发送和接受数据。
- TextMessage:以普通字符文本格式存储消息,
- MapMessage:以Map格式存储消息,取的时候也是以key来取。
- BytesMessage:以字节文本格式存储消息,取到的数据是字节格式的。
- StreamMessage:以流格式存储消息,取到的数据是流。
- ObjectMessage:以对象格式来存储消息,取到的数据是一个对象。
- 【以什么格式发送消息取决与使用session建立消息的时候建立的是什么消息】

PTP式消息传递
- PTP全称点对点,点对点即一方发,一方收。【发的一方能够有多个生产者,收的一方能够有多个消费者,但每个消息的传递只能有一个生产者和一个消费者】

- PTP式的消息传递就有点像发邮件,A要向B发邮件,那么A把邮件发到邮件服务器上,而后B从邮件服务器上获取消息。
- PTP式的消息传递的特性:
- 消息只能被消费一次。【当消息发送到某一个目的地的时候,消费者均可以从这个目的地取消息,但每一个消息只能被消费一次】
- 想要使用PTP模式的消息传递,那么建立消息目的地的时候须要建立成队列Queue。
Destination destination = session.createQueue("tempqueue");
PUB/SUB式消息传递
- PUB/SUB中文称为发布/订阅。所谓发布订阅,以“微博”为例,博主能够发送微博到微博服务器,而后订阅了这个博主的人都会“收到”这个博主发的微博。这个过程是一点对多点的过程。因此发布订阅模式适用于一些广播消息的状况。
- PUB/SUB式消息传递的特性:
- 是一点对多点的消息广播。
- 消息发布订阅有分为非持久订阅和持久订阅。非持久订阅,那么某个时间不在线的客户将不能接收到那个时间发送的消息。持久订阅会把那时不在线的客户进行标记,等到客户上线的时候会从新发送。

- 想要使用PTP模式的消息传递,那么建立消息目的地的时候须要建立成主题Topic。
Destination destination = session.createTopic("tempTopic");
- 持久化订阅的实现:
- 非持久化订阅只要求消费者处在消息接收状态便可。而持久化订阅须要解决离线问题。
- 在JMS中,是经过建立持久化订阅器DurableSubscriber来解决离线问题的,持久化订阅器是与消费者进行绑定的,消费者经过持久化订阅器来获取消息,当消费者不在线的时候,持久化订阅器会存储这些离线消息。等到消费者上线,再次调用建立持久化订阅器方法的时候,消息队列不会从新建立持久化订阅器,而是把这认为是消费者上线的标识,而后把离线的消息都发给消费者。
- 建立持久化订阅器的方法是createDurableSubscriber,第一个参数是一个Topic,第二个参数是订阅的名称(只是一个标识)。
可靠性机制
可靠性机制是保证消息被正常消费的机制,有时候消费者接收了消息,但处理过程当中忽然宕机了,这样消息的消费就会失败。因此为了确保消息被消费成功,消息队列使用“确认(acknowledge)”来标识一个消息被消费成功,若是消费者没有对消息进行确认,那么消息队列会认为这个消息被消费失败了,(不一样的需求状况会有不一样的针对消费失败的处理,在ptp中,若是消费失败,那么消息不会被删除,因此下一次消费者还能取到以前消费失败的消息)。
消息的成功消费一般包含三个阶段:客户接受消息,客户处理消息和消息被确认。

在非事务会话中,消息被什么时候确认取决于建立会话时的应答模式acknowledagement mode,该参数有三个可选值:session
- Session.auto_acknowledge: 自动确认,当客户成功的从receive返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话会自动确认客户接收到的消息。
- Session.client_acknowledge: 消费者手动确认,客户经过调用消息的acknowledge方法来确认消息。须要注意的是,在这种模式中,确认是在会话层中进行的,确认一个被消费的消息将自动确认全部已被会话消费的消息。即即便仅确认第一个,其他接受的也会被同时确认。
- session.dups_ok_acknowledge: Session没必要确保对传送消息的签收,这个模式可能会引发消息的重复,可是下降了Session的开销,因此只有客户端能容忍重复的消息,才可以使用。

在事务性会话中【此时使用事务来确保多条消息的消费】,当一个事务被提交的时候,确认会自动发生,不须要使用额外的消息确认语句,只须要进行事务提交。

事务
- 事务用来确保某一系列操做的共同成功或共同失败。
- JMS Session接口提供了commit和rollback方法。事务提交意味着生产的全部消息被发送(生产者)或消费的全部消息被确认(消费者)。事务回滚意味着生产的全部消息被销毁(生产者),消费的全部消息被恢复并从新提交(消费者),除非它们已通过期。
- 当session开启事务时,此时的session是一个事务性会话,若是你直接关闭session,那么他会回退其中没有commit的事务。
- 须要注意的是,事务过程当中发送的消息会知道事务提交才会真正发送。
- 在建立Session时,第一个参数决定是否开启事务,true为开启,false为不开启。

消息持久性规范DeliveryMode
- JMS支持以持久模式和非持久模式来进行消息提交
- 以持久模式PERSISTENT来发送的消息,那么会要求JMS Provider持久保存消息,以保证消息不会由于JMS Provider的失败而丢失。
- 以非持久模式NON_PERSISTENT来发送的消息,那么不会要求JMS Provider持久保存消息。
- 在消息发送时可使用生产者.setDeliveryMode来设置发送到消息队列的消息是不是持久化的。也能够在send的时候针对每一条消息来设置。

消息优先级规范Priority
- 消息优先级能够指示JMS Provider首先提交紧急的消息。
- 优先级分10个级别,为0-9。
- 若是不指定优先级,默认级别是4.须要注意的是,JMS Provider并不必定按照优先级的顺序提交消息。只确保加急消息优先于普通消息。
消息过时规范Expiration
- 能够在消息头中设置消息在必定时间后过时,默认是永不过时的。
消息的临时目的地
- 能够经过会话上的createTemploraryQueue方法和createTemporaryTopic方法来建立临时的目的地,它们的存在时间只限于建立它们的链接所保持的时间。只有建立该临时目的地的链接上的消息消费者才可以从临时目的地中提取消息。
- 固然,因为临时目的地也是一个目的地,咱们能够把这个临时目的地放在relpyTo中,这样让消费者把回复放到这个临时目的地中。这就实现了某种称呼上的“消息回复”。
JMS应用开发基本步骤:
1.建立一个JMS connection factory
2.经过connection factory来建立JMS Connection
3.启动JMS Connection
4.经过connection建立JMS Session
5.建立JMS Destination
6.建立JMS Producer,或者建立JMS Message,并设置Destination.
7.建立JMS Consumer,或者注册一个JMS Message Listener
8.发送或者接受JMS Message
9.关闭全部的JMS资源(Connection,Session)
总结
1.从上面的内容你应该了解到了PTP和PUB/SUB的思想,或许你还没懂怎么使用,但你应该内心应该知道它们的应用场景。【对于PTP/SUB你还须要了解到持久化订阅和非持久化订阅,持久化订阅须要使用到持久化订阅器来解决消费者离线问题】
2.你能够了解到事务(session.commit())、可靠性机制(acknowledge)。
3.持久化:解决宕机问题。
4.其余的属性:优先级、过时时间、临时目的地
5.JMS应用开发的概念步骤。
6.【若是上面的几点你看了没印象,建议你从新看过】
7.下面将使用activeMQ来实践这些理论知识:http://www.javashuo.com/article/p-cawgalqe-bn.html。