JMS 是 SUN 公司开发的一套访问 MOM(Message-Oriented-Middleware) 消息服务中间件的标准 API。
html
MOM 提供消息接收和转发的服务,对消息进行缓存和持久操做,保证消息的安全性,JMS让开发都无须了解远程过程调用的细节和网络通讯协议的细节就能够经过JMS向MOM发送消息,借助消息咱们能够松散耦合的方式集成不一样的应用。spring
JMS(Java Message Service)即Java消息服务。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。它支持两种消息通讯模型:点到点(point-to-point)(P2P)模型和发布/订阅(Pub/Sub)模型。P2P 模型规定了一个消息只能有一个接收者;Pub/Sub 模型容许一个消息能够有多个接收者。
apache
对于点到点模型,消息生产者产生一个消息后,把这个消息发送到一个Queue(队列)中,而后消息接收者再从这个Queue中读取,一旦这个消息被一个接收者读取以后,它就在这个Queue中消失了,因此一个消息只能被一个接收者消费。能够定义多个消费者,可是同一个消息只会被一个消费者消费;数组
与点到点模型不一样,发布/订阅模型中,消息生产者产生一个消息后,把这个消息发送到一个Topic中,这个Topic能够同时有多个接收者在监听,当一个消息到达这个Topic以后,全部消息接收者都会收到这个消息。缓存
点对对点消息传递域的特色:
安全
每一个消息只能有一个消费者;网络
消息的生产者和消费者之间没有时间上的相关性。不管消费者在生产者发送消息的时候是否处于运行状态,它均可以提取消息。并发
发布/订阅消息传递域的特色:
app
每一个消息能够有多个消费者;dom
生产者和消费者之间有时间上的关联性。订阅一个主题的消费者只能消费自它订阅以后发布的消息。JMS规范容许客户建立持久阅读,这在必定程度上放松了时间上的相关性要求。持久订阅容许消费者消费它在未处于激活状态时发送的消息。
Destination——消息发送的目的地,也就是前面说的Queue和Topic。建立好一个消息以后,只须要把这个消息发送到目的地,消息的发送者就能够继续作本身的事情,而不用等待消息被处理完成。至于这个消息何时,会被哪一个消费者消费,彻底取决于消息的接受者。
Connection——与JMS提供者(IBM Webshphere MQ, ActiveMQ等)创建的一个链接。能够从这个链接建立一个会话,即Session。
ConnectionFactory——那如何建立一个Connection呢?ConnectionFactory。经过这个工厂类就能够获得一个与JMS提供者的链接,即Conection。
Session——与JMS提供者所创建的会话,经过Session咱们才能够建立一个Message。
Producer——消息的生产者,要发送一个消息,必须经过这个生产者来发送。
MessageConsumer——与生产者相对应,这是消息的消费者或接收者,经过它来接收一个消息。
Message——从字面上就能够看出是被发送的消息。它有下面几种类型:StreamMessage:Java 数据流消息,用标准流操做来顺序的填充和读取。MapMessage:一个Map类型的消息;名称为 string 类型,而值为 Java 的基本类型。TextMessage:普通字符串消息,包含一个String。ObjectMessage:对象消息,包含一个可序列化的Java 对象BytesMessage:二进制数组消息,包含一个byte[]。XMLMessage: 一个XML类型的消息。最经常使用的是TextMessage和ObjectMessage。
JMS 事务——JMS 使用 Session 控制事务 , 如何须要事务能够在建立 Session 时标注须要事务。
ConnectionFactory和Destination经过JMS提供者提供给咱们的接口获得。
消息的消费能够采用如下两种方法之一:
同步消费——经过调用消费者的receive方法从目的地中显式地提取消息。receive方法能够一直阻塞到消息到达;
异步消费——客户能够为消费者注册一个消息监听器,以定义在消息到达时所采起的动做。
JMS支持如下两种消息提交模式:
PERSISTENT——指示JMS provider持久保存消息,以保证消息不会由于JMS provider的失败而丢失。
NON_PERSISTENT——不要求JMS provider持久保存消息。
JMS消息只有在被确认以后,才认为已经被成功消费。消息的成功消费一般包含三个阶段:客户接收消息、客户处理消息和消息被确认。
在事务性会话中,当一个事务被提交时,确认自动发生。在非事务性会话中,消息什么时候被确认取决于建立会话时的应答模式(acknowledgement mode)。该参数有如下三个可选值:
Session.Auto_ACKNOWLEDGE。当客户成功地从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE。客户经过消息的acknowledge方法确认消息。须要注意的是,在这种模式中,确认是在会话层进行:确认一个被消费的消息将自动确认全部已被会话消费的消息。例如,若是一个消息消费者消费了10个消息,而后确认第5个消息,那个全部10个消息都被确认。
Session.DUPS_ACKNOWLEDGE。在会话迟钝等状况下确认消息。若是JMS provider失败,那么可能会致使一些重复的消息。若是是重复的消息,那么JMS provider必须把消息头的JMSRecelivered字段设置为true。
JMS消息头
JMS Headers set automatically by the client’s send:
JMSDestination
JMSDeliveryMode 持久化/非持久化
JMSExpiration 消息过时时间
JMSMessageID
JMSPriority 消息优先级 0-9 9为最高级
JMSTimestamp 消息被建立的时间
JMS Headers set optionally by the client:
MSCorrelationID Used to associate the current message with a previous message
JMSReplyTo Used to specify a destination where a reply should be sent
Headers set optionally by the JMS provider:
JMSRedelivered Used to indicate the liklihood that a message was previously delivered but not acknowledged.
JMS 消息属性
CUSTOM PROPERTIES,自定义属性
经过set or get 方法操做;
JMS-DEFINED PROPERTIES ,these properties is optional:
JMSXAppID—Identifies the application sending the message
JMSXConsumerTXID —The transaction identifier for the transaction within which this message was consumed
JMSXDeliveryCount—The number of message delivery attempts
JMSXGroupID—The message group of which this message is a part
JMSXGroupSeq—The sequence number of this message within the group
JMSXProducerTXID—The transaction identifier for the transaction within which this message was produced
JMSXRcvTimestamp —The time the JMS provider delivered the message to the consumer
JMSXState—Used to define a provider-specific state
JMSXUserID —Identifies the user sending the message
JMS Message Selector 消息选择器
选择器语法:
Literals Booleans TRUE/FALSE; numbers such as 5, -10, +34; numbers with decimal or scientific notation such as 43.3E7, +10.5239
Identifiers A header or property field
Operators AND, OR, LIKE, BETWEEN, =, <>, <, >, <=, =>, +, -, *, /, IS NULL, IS NOT NULL
MESSAGE BODY
JMS defines six Java types for the message body, also known as the payload. Through the use of these objects, data and information can be sent via the message payload.
Message —The base message type. Used to send a message with no payload, only headers and properties. Typically used for simple event notification.
TextMessage —A message whose payload is a String. Commonly used to send simple textual and XML data.
MapMessage —Uses a set of name/value pairs as its payload. The names are of type String and the values are a Java primitive type.
BytesMessage —Used to contain an array of uninterpreted bytes as the payload.
StreamMessage—A message with a payload containing a stream of primitive Java types that’s filled and read sequentially.
ObjectMessage—Used to hold a serializable Java object as its payload. Usually used for complex Java objects. Also supports Java collections.
MESSAGE DURABILITY 和 MESSAGE PERSISTENCE 的区别
durable模式下,消息发送到队列以前存在的全部订阅者都消费以后,消息移除;若果订阅者中间断开,消息会保留,知道订阅者再次链接后消费完成。
非durable模式下,当前存在订阅者都消费以后,消息移除;
transport connectors:
支持协议类型:
支持协议类型:TCP NIO and UDP broker.SSL HTTP(S)ker.VM
服务端定义 <transportConnectors> <transportConnector name="tcp" uri="tcp://localhost:61616?trace=true"/> <transportConnector name="udp" uri="udp://localhost:61618?trace=true" /> </transportConnectors>
network connectors
·Static networks
static:(uri1,uri2,uri3,...)?key=value <transportConnectors> <transportConnector name="tcp" uri="tcp://localhost:61616?trace=true"/> </transportConnectors> <networkConnectors> <networkConnector name="local network" uri="static://(tcp://remotehost1:61616,tcp://remotehost2:61616)"/> </networkConnectors> 说明:客户端链接并发送消息到broker 61616,而后61616 broker会将消息转发到remotehost1:61616 broker; FAILOVER PROTOCOL,采用随机的方式进行broker链接,能够经过设置randomize=false使其默认链接uri1,只有当uri1不可用时,才链接uri2,实现Master/Slave模式; failover:(uri1,...,uriN)?key=value
·Dynamic networks
MULTICAST CONNECTOR(服务端)
multicast://ipadaddress:port?key=value <networkConnectors> <networkConnector name="default-nc" uri="multicast://default"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> </transportConnectors>
DISCOVERY PROTOCOL(客户端) discovery:(discoveryAgentURI)?key=value discovery:(multicast://default) PEER PROTOCOL 对等协议,用于两个embedded broker之间实现network connection,以下图:
FANOUT CONNECTOR 分列链接器,客户端链接协议,只能用于生产者,将消息复制到多个broker上,且各个broker之间不相互链接,不然可能出现一个消息被同一个消费者屡次消费; fanout:(fanoutURI)?key=value fanout:(static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)) fanout:(multicast://default)(前提是broker采用multicast 协议)
关于链接器总结:
message storage
KahaDB message store
AMQ message store internals
适用于高吞吐率场景,不用于存在大量destination的场景,由于每个destination会对应一个index文件。
默认使用NIO
基本配置方式:
<persistenceAdapter> <amqPersistenceAdapter directory="target/Broker2-data/activemq-data" syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100" maxFileLength="10mb" /> </persistenceAdapter>
JDBC message store
The memory message store
集成amq到application
一、经过原始API:
a、 Embedding Active MQ using the BrokerService
b、Embedding ActiveMQ using the BrokerFactory
二、经过spring:
a、 Pure Spring XML
b、Using the BrokerFactoryBean
c、Using a custom XML namespace with Spring amq
集群
一、多个消费者竞争消费同一个消息;
一、消息生产者把消息发送到任意一个broker上;
二、消息消费者在全部broker上监听;
获取exclusive lock的broker为master broker,只有master broker的transport connectors会被启动,所以全部的客户端只会同时链接在同一个broker上;