ActiveMQ支持JMS规范中的持久化消息与非持久化消息mysql
Queue消息模型在ActiveMQ的存储sql
采用存储采用先进先出(FIFO),一个消息只能被一个消费者消费,当消息被确认消费以后才会被删除。数据库
Topic消息模型(针对持久订阅)
每一个订阅者获取的消息实际是消息的一个副本,只有一个消息副本会被存储,MQ提供了一个指针来指向消息存储而且分发消息副本到订阅者,消息直到全部的持久化订阅者都被接收才能被删除。apache
持久化存储方式:缓存
KahaDB是从ActiveMQ 5.4开始默认的持久化插件。KahaDb恢复时间远远小于其前身AMQ而且使用更少的数据文件,因此能够彻底代替AMQ,kahaDB的持久化机制一样是基于日志文件,索引和缓存。服务器
(一)、KahaDB主要特性:性能
(二)、适用场景:大数据
(三)、配置方式 conf/activemq.xml:url
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
(四)、KahaDB存储原理:spa
当有活动消费者时,用于临时存储,消息会被发送给消费着,同时被安排将被存储,若是消息及时被确认,就不须要写入到磁盘。写入到磁盘中的数据消息,在后续的消息活动中,若是消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。
一、KahaDB内部结构
Data logs:消息日志包含了消息日志和一些命令
Cache:当有活动消费者时,用于临时存储,消息会被发送给消费着,同时被安排将被存储,若是消息及时被确认,这不须要写入到磁盘
Btree indexes(消息索引):用于引用消息日志(message id),它存储在内存中,这样能快速定位到。MQ会按期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。
Redo log用于在非正常关机状况下维护索引完整性。
二、目录结构:
Db log files:用于存储消息(默认大小32M),当log日志满了,会建立一个新的,当log日志中的消息都被删除,该日志文件会被删除或者归档。
Archive directory:当datalog不在被kahadb须要会被归档(经过archiveDataLogs属性控制)。
Db.data:存放Btree indexs。
Db.redo:存放redo file,用于恢复Btree indexs。
写入消息时,会将消息写入日志文件,因为是顺序追加写,性能很高。为了提高性能,建立消息主键索引,而且提供缓存机制,进一步提高性能。每一个日志文件的大小都是有限制的(默认32m,可自行配置)。当超过这个大小,系统会从新创建一个文件。当全部的消息都消费完成,系统会删除这个文件或者归档(取决于配置)。主要的缺点是AMQ Message会为每个Destination建立一个索引,若是使用了大量的Queue,索引文件的大小会占用不少磁盘空间。并且因为索引巨大,一旦Broker崩溃,重建索引的速度会很是慢。
特色:相似KahaDB,也包含了事务日志,每一个destination都包含一个index文件,AMQ适用于高吞吐量的应用场景,可是不适合多个队列的场景。
配置方式conf/activemq.xml:
<!--AMQ directory:数据存储路径 syncOnWrite:是否同步写入 maxFileLength:日志文件大小 --> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/AMQdb" syncOnWrite="true" maxFileLength="10mb" /> </persistenceAdapter>
一、AMQ内部结构:
Data logs:消息日志包含了消息日志
Cache:用于消息的快速检索
Reference store indexes:用于引用datalogs中的消息,经过message ID 关联
二、目录结构:
Lock:保证同一时间只有一个borker访问文件目录
temp-storag:用于存储非持久化消息(当不在被存储在内存中),如等待慢消费者处理消息
Kr-store:用于存储引用消息日志数据
journal directory:包含了消息文件、消息日志和消息控制信息
Archive:归档的数据日志
支持经过JDBC将消息存储到关系数据库,性能上不如文件存储,能经过关系型数据库查询到消息的信息。
MQ支持的数据库:Apache Derby、MYsql、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。
存储表结构:
A、ACTIVEMQ_MSGS:用于存储消息,Queue和Topic都存储在这个表中:
B、ACTIVEMQ_ACKS:用于存储订阅关系。若是是持久化Topic,订阅者和服务器的订阅关系在这个表保存:
C、ACTIVEMQ_LOCK(消息锁,保证同一时间只能有一个broker访问这些表结构):
表activemq_lock在集群环境中才有用,只有一个Broker能够得到消息,称为Master Broker,其余的只能做为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪一个Broker是当前的Master Broker。
配置方式:
一、配置数据源 conf/acticvemq.xml文件:
<!-- 配置数据源--> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="111111"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
二、配置broke中的persistenceAdapter :
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候建立数据表,默认值是true,这样每次启动都会去建立数据表了,通常是第一次启动的时候设置为true,以后改为false。
<!-- JDBC配置 --> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> </persistenceAdapter>
ps:数据库activemq 须要手动建立。
内存消息存储,会将全部的持久化消息存储在内存中,必须注意JVM使用状况以及内存限制,适用于一些能快速消费的数据量不大的小消息,当MQ关闭或者宕机,未被消费的内存消息会被清空。
配置方式 设置 broker属性值 persistent="false":
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false">