ActiveMQ(六)——ActiveMQ的消息存储

1、队列和topicjava

  • 概述
    ActiveMQ不只支持persistent和non-persistent两种方式,还支持消息的恢复(recovery)方式
  • PTP
    Queue的存储方式很简单,就是一个FIFO(先进先出)的Queue
    ActiveMQ(六)——ActiveMQ的消息存储
  • PUB/SUB
    对于持久化订阅主题,每个消费者将得到一个消息的复制
    ActiveMQ(六)——ActiveMQ的消息存储
  • 有效的消息存储
      ActiveMQ提供了一个插件式的消息存储,相似于消息的多点传播,主要实现了以下几种:
    1:AMQ消息存储一基于文件的存储方式,是之前的默认消息存储
    2:KahaDB消息存储一提供了容量的提高和恢复能力,是如今的默认存储方式
    3:JDBC消息存储一消息基于JDBC存储的
    4:Memory消息存储一基于内存的消息存储

2、KahaDBmysql

  • KahaDB Message Store概述
        KahaDB是目前默认的存储方式,可用于任何场景,提升了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它全部的地址。
        KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到datalogs中。当再也不须要log文件中的数的时候,log文件会被丢弃。
  • KahaDB基本配置例子sql

    <persistenceAdapter>
    <kahaDB directory="${actlvemq.data}/kahadb"/>
    </persistenceAdapter>

    可用的属性有:
    1:director:KahaDB存放的路径,默认值activemq-data
    2:indexWriteBatchSize:批量写入磁盘的索引page数量,默认值1000
    3:indexCacheSize:内存中缓存索引page的数量,默认值10000
    4:enableIndexWriteAsync:是否异步写出索引,默认false
    5:journalMaxFi1eLength:设置每一个消息data log的大小,默认是32M
    6:enab1eJournalDiskSyncs:设置是否保证每一个没有事务的内容,被同步写入磁盘,JMS持久化的时候须要,默认为true
    7:cleanupInterval:在检查到再也不使用的消后,在具体删除消息前的时间,默认30000
    8:checkpointInterval:checkpoint的间隔时间,默认5000
    9:ignoreMissingJournalfiles:是否忽略丢失的消息日志文件,默认false
    10:checkForCourruptJournalFiles:在启动的时候,将会验证消息文件是否损坏,默认为false
    11:checksumJournalFiles:是否为每一个消息日志文件提供checksum,默认false
    12:archiveDataLogs:是否移动文件到特定的路径,而不是删除它们,默认false
    13:directoryArchive:定义消息已经被消费后,移动data log到的路径,默认为null
    14:databaseLockedWaitDelay:得到数据库锁的等待时间,默认10000
    15:maxAsyncJobs:设置最大的能够存储的异步消息队列,默认10000,能够和concurrent MessageProducers设置成同样的值
    16:concurrentStoreAndDispatchTransactions:是否分发消息到客户端,同时事务存储消息,默认true
    17:concurrentStoreAndDispatchTopics:是否分发Topic消息到客户端,同时进行存储,默认true
    18:concurrentStoreAndDispatchQueues:是否分发queue消息到客户端,同时进行存储,默认true数据库

  • 在Java中内嵌使用Broker,使用KahaDB的例子apache

    public class EmbeddedBrokerUsingKahaDBStoreExample {
    public BrokerService createEmbeddedBroker()throws Exception{
        BrokerService broker = new BrokerService();
        File dataFileDir = new File("target/amq-in-action/kahadb");
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        kaha.setJournalMaxFileLength(1024*1000);
        kaha.setIndexWriteBatchSize(100);
        kaha.setEnableIndexWriteAsync(true);
    
        broker.setPersistenceAdapter(kaha);
        broker.addConnector("tcp://localhost:61616");
        broker.start();
        return broker;
    }
    }

3、AMQ缓存

  • AMQ Message Store概述
        AMQ Message Store 是ActiveMQ5.0缺省的持久化存储,它是一个基于文件、事务存储设计为快速消息存储的一个结构,该结构是以流的形式来进行消息交互的。
        这种方式中,Messages被保存到data logs中,同时被reference store进行索引以提升存取速度。Data logs由一些单独的data log文件组成,缺省的文件大小是32M,若是某个消息的大小超过了data log文件的大小,那么能够修改配置以增长data log文件的大小。若是某个data log文件中全部的消息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。
  • AMQ Message Store配置示例
    <broker brokerName="broker" persistent="true" useShutdownHook="false">
    <persistenceAdapter>
        <amqPersistenceAdapter driectory="${activemq.base}/data"    maxFileLength="32mb"/>
    </persistenceAdapter>
    </broker>

    4、 JDBC异步

  • 使用JDBC来持久化消息(此步骤不须要手工跑脚本)
    ActiveMQ支持使用JDBC来持久化消息,预约义的表以下:
    1:消息表,缺省代表为ACTIVEMQ_MSGS,quue和topic都存在里面,结构以下:
    ActiveMQ(六)——ActiveMQ的消息存储
    2:ACTIVEMQ_ACKS表存储持久订阅的信息和最后一个持久订阅接收的消息ID,结构以下:
    ActiveMQ(六)——ActiveMQ的消息存储
    3:锁定表,缺省代表为ACTIVEMQ_LOCK,用来确保在某一时刻,只能有一个Act broker实例来访问数据库,结构以下:
    ActiveMQ(六)——ActiveMQ的消息存储
  • 使用JDBC来持久化消息的配置示例
    ActiveMQ(六)——ActiveMQ的消息存储

注意:
(1)数据库须要字符集设置为latin1。
(2)须要把mysql-connector-java.jar包放入lib中。
(3)启动成功以后会出现三张表。tcp

  • 示例
    1、(queue模式):
    ActiveMQ(六)——ActiveMQ的消息存储
    ActiveMQ(六)——ActiveMQ的消息存储
    运行发送者(数据库中有三条未接收的消息):
    ActiveMQ(六)——ActiveMQ的消息存储
    运行接收者(消息成功接收的同时,数据库中的消息也会被删除)

2、persistence模式
消息接收者接收完成以后,数据库中的消息不会被删除。
如图所示:
ActiveMQ(六)——ActiveMQ的消息存储ide

  • JDBC Message Store with ActiveMQ Journal(日志)
    这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提升了性能。配置示例以下:性能

    <beans>
    <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
        <persistenceFactory>
            <journalPersistenceAdapterFactory 
                journalLogFiles="4"
                journalLogFileSize="32768"
                useJournal="true"
                useQuickJournal="true"
                dataSource="#derby-ds"
                dataDirectory="activemq-data"/>         
        </persistenceFactory>
    </broker>
    </beans>
  • JDBC Store和JDBC Message Store with ActiveMQ Journal的区别
    1:jdbc with journal的性能优于jdbc
    2:jdbc用于master/slave模式的数据库分享
    3:jdbc with journal不能用于master/slave模式
    4:通常状况下(非集群状态下),推荐使用jdbc with journal

5、 MMS

  • Memory Message Store
        内存消息存储主要是存储全部的持久化的消息在内存中。这里没有动态的缓存存在,因此必需要注意设置broker所在的JVM和内存限制。
  • Memory Message Store 配置示例
    <beans>
    <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors uri="tcp://localhost:61616"/>
        </transportConnectors>
    </broker>
    </beans>
  • 在Java中内嵌使用Broker,使用Memory的例子
    public void createEmbeddedBroker()throws Exception{
    BrokerService broker = new BrokerService();
    broker.setPersistent(false);
    broker.addConnector("tcp://localhost:61616");
    broker.start();
    }