
同步发送数据库
异步发送session
- ProducerWindowSize 每次发送统计数量
- 开启事务必然异步发送
JMS消息的可靠性机制异步
- JMS Session接口提供了 commit 和 rollback 方法
- 事务提交意味着生产的全部消息被发送
- 事务回滚意味着生产的全部消息被销毁,
- 消费的所 有消息被恢复并从新提交,除非它们已通过期。
- 关闭事务性会话将回滚其中的事务
- 在非事务型会话中
- Session.AUTO_ACKNOWLEDGE
- Session.CLIENT_ACKNOWLEDGE
- Session.DUPS_ACKNOWLEDGE
点对点消息传递性能
- 每一个消息只能有一个消费者
- 消息的生产者和消费者之间没有时间上的相关性。
- 不管消费者在生产者发送消息的时候是否处于运行状态,
- 均可以提取消息
发布订阅消息传递域fetch
- 每一个消息能够有多个消费者
- 生产者和消费者之间有时间上的相关性。
- 订阅一个主题的消费者只能消费自它订阅以后发布的消息。
- JMS 规范容许客户建立持久订阅,这在必定程度上下降了时间上的相关性要求。
- 持久订阅容许消费者消费它在未处于激活状态时发送的消息
JMS 消息由几部分组成:优化
- 消息头
- 包含消息头包含消息的识别信息和路由信息
- 消息头包含一些标准的属性如:
- JMS Destination 消息发送的目的地, queue 或者 topic
- JMSDeliveryMode传送模式。持久模式和非持久模式
- JMS Priority 消息优先级
- 优先级分为 10 个级别,从 0 最低 到 9 最高
- 若是不设定优先级,默认级别是 4 。
- JMSMessage ID 惟一识别每一个消息的标识
- 属性
- 按类型能够分为应用设置的属性,标准属性和消息中间件定义的属性
- 消息体
持久订阅设计
- 持久订阅有两个特色:
- 持久订阅者和非持久订阅者针对的 Domain 是 Pub/Sub 而不是 P2P
- 当 Broker 发送消息给订阅者时,
- 若是订阅者处于未激活状态:
- 持久订阅者能够收到消息
- 而非持久订阅者则收不到消息
- 若是持久订阅者订阅的消息太多则会溢出
消息的持久化策略3d
- KahaDB存储
- 默认的存储方式
- 可用于任何场景,提升了性能和恢复能力
- 消息存储使用一个事务日志和仅仅用一个索引文件来存储它全部的地址。
- KahaDB是一个专门针对消息持久化的解决方案
- KahaDB的存储原理
- 在data/kahadb这个目录下,会生成四个文件
- Ø db.data 它是消息的索引文件,
- 本质上是B-Tree(B树)
- 使用B-Tree做为索引指向db-*.log里面存储的消息
- Ø db.redo 用来进行消息恢复
- Ø db-*.log 存储消息内容。
- 新的数据以APPEND的方式追加到日志文件末尾。
- 属于顺序写入,所以消息存储是比较快的。
- 默认是32M,达到阀值会自动递增
- Ø lock文件锁,
- JDBC存储
- ACTIVEMQ_MSGS 消息表,queue和topic都存在这个表中
- ACTIVEMQ_ACKS 存储持久订阅的信息和最后一个持久订阅接收的消息ID
- ACTIVEMQ_LOCKS 锁表,用来确保某一时刻,只能有一个ActiveMQ broker实例来访问数据库
- LevelDB存储
- LevelDB持久化性能高于KahaDB
- 官方建议使用以及再也不支持
- Memory 消息存储
- persistent=”false”,表示不设置持久化存储,直接存储到内存中
- JDBC Message store with ActiveMQ Journal
- 这种方式克服了JDBC Store的不足,JDBC每次消息过来,都须要去写库和读库。
- 当消费者的消费速度可以及时跟上生产者消息的生产速度时,journal文件可以大大减小须要写入到DB中的消息。
- 若是消费者的消费速度很慢,这个时候journal文件可使消息以批量方式写到DB。
消费端消费消息的原理日志
prefetch limit 规定了一次能够向消费者Push(推送)多少条消息。中间件
- 有两种方法能够接收消息,
- 一种是使用同步阻塞的MessageConsumer#receive方法。
- 另外一种是使用消息监听器MessageListener。
- 同一个session下,这二者不能同时工做,
- 也就是说不能针对不一样消息采用不一样的接收方式。不然会抛出异常。
- 为何这么作,最大的缘由仍是在事务性会话中,

- sendPullCommand
- 发送pull命令从broker上获取消息,前提是prefetchSize=0而且unconsumedMessages为空。
- unconsumedMessage表示未消费的消息,这里面预读取的消息大小为prefetchSize的值
- clearDeliveredList
- 在上面的sendPullCommand方法中,会先调用clearDeliveredList方法,
- 主要用来清理已经分发的消息链表deliveredMessages
- dequeue
- 为何要存在这样一个消息分发通道
- 你们能够想象一下,若是消费者每次去消费完一个消息之后再去broker拿一个消息,效率是比较低的。
- 因此经过这样的设计能够容许session可以一次性将多条消息分发给一个消费者。
- 默认状况下对于queue来讲,prefetchSize的值是1000
- beforeMessageIsConsumed
- afterMessageIsConsumed
- 主要做用是执行应答操做,这里面作如下几个操做 Ø 若是消息过时,则返回消息过时的ack Ø 若是是事务类型的会话,则不作任何处理 Ø 若是是AUTOACK或者(DUPS_OK_ACK且是队列),而且是优化ack操做,则走批量确认ack Ø 若是是DUPS_OK_ACK,则走ackLater逻辑 Ø 若是是CLIENT_ACK,则执行ackLater