RocketMQ——初识消息中间件,JMS

JMS(Java MessageService)是一套Java的API,最初设计目的是为了让应用程序能访问现有消息中间件系统,后来直接被消息中间件开发商采用,直接用以开发消息中间件。常见的消息中间件有ActiveMQ,RocketMQ,RabbitMQ,Kafka。

常用消息中间件对比如下:
在这里插入图片描述

“消息”是两台计算机之间传送的数据单位。消息可以很简单,如只包含文件字符串信息;也可以很复杂,如包含自定义的对象。

“消息队列”是用于在消息传输过程中存储消息的容器。消息队列管理器在消息从源到目标时充当中间人。

消息队列存在的目的是保证消息的可靠传输,若发消息时接收者不可用,则消息队列会保留消息,直至成功传递。

消息队列的主要特点是异步处理,主要目的是为了减少响应时间与解耦,所以主要使用场景就是比较耗时且不用及时返回结果的数据传输。若使用了消息队列,实际上若能确保消息格式不变,消息的发送方和接收方都不需要彼此联系,这就是解耦。所以使用异步交互的地方其实都可以使用到消息队列。由于同步通常意味着阻塞,而大量线程阻塞则会降低计算机的性能,如在高峰期的订单处理,在前端时可将订单信息放入消息队列,后端应用从队列中依次获取消息处理,慢慢消化订单。

JMS中的一些角色

Broker:Broker其实就是一个服务器,一个Server,如启动一个RocketMQ,则broker就代表当前的RocketMQ

Producer:消息生产者是由会话创建的一个对象,用于将消息发送到目的地

Consumer:消息消费者是由会话创建的一个对象,用于接收发送到目的地的消息,消费有两种模式

  • 同步消费:调用Consumer的receive方法从目的地中显示地提取消息,receive会一直阻塞到消息到达
  • 异步消费:客户可为Consumer注册一个监听器,用于定义消息在到达目的地时采取的功能

Sessoin:会话是生产消费信息的一个上下文,用于创建producer和consumer和消息等。会话提供了一个事务性的上下文,即在一个上下文中,一组发送和接收被组合到了一个原子操作中。

Destination:消息发往的目的地,是生产的消息的目标,也是消费的消息的来源,目的地具体有两种:QueueTopic

Queue:队列,用于点对点p2p消息模型

Topic:主题,用于订阅/发布消息模型

p2p:一条消息只能被一个消费者处理,一旦处理,Queue中将不再保存该消息,即阅后即焚。当消费者不存在时,Queue会一直保存该消息。在这里插入图片描述
Topic:消息生产者(发布)该消息,由多个消息消费者(订阅)处理该消息。当生产者发布消息后,Topic将不再保留该消息(Topic只做推送消息的功能,并不持久化存储消息,而Queue会存储消息)。所以Topic模式一定先有消费者,后有生产者,不然会出现发布消息无人消费的局面。
在这里插入图片描述

消息的组成

JMS消息由以下三部分组成:

  • 消息头:每个消息头都有对应的setter和getter
  • 消息属性:除消息头之外的值,都可以设置为消息属性
  • 消息体:具体消息的值,可供使用的类型下文阐述

消息体的类型

  • TextMessage:文本消息
  • MapMessage:ket-value型消息
  • BytesMessage:字节流
  • StreamMessage:Java的原始数据流
  • ObjectMessage:Java的序列化对象

消息的可靠性机制

只有消息消费确认了,这条消息才算被成功消费。消息的成功消费通常包括三个阶段:客户接收消息,客户处理消息,消息消费被确认。在事务性会话中,当一个事务被提交时,消息消费确认会自动发生。在非事务性会话中,消息何时被确认取决于创建会话时设置的应答模式(Acknowledge Mode),有以下常用的三个参数可选:

  • Session.AUTO_ACKNOWLEDGE:当客户成功从receive方法返回时,或从MessageListener.onMessage方法返回时,会话自动确认客户收到的信息
  • Session.CLIENT_ACKNOWLEDGE:客户通过消息的acknowledge方法确认消息。需要注意的是这个模式下确认被消费的消息将自动确认所有消息。如一个消费者消费了10条消息,然后确认第5条消息,这时10条消息都会被确认。
  • Session.DUPS_ACKNOWLEDGE:客户发生异常时可能会导致消息的重发,这时为提高性能,应答会被延迟。这就可能造成消息重复。若是重复的消息,可通过配置该应答模式,将消息头的JMSRedelivered置为true

消息提交的优先级(只有ActiveMQ支持)

0(最低)~9(最高),类似与线程的setPriority,设置了也不一定生效