ActiveMQ入门之四--ActiveMQ持久化方式

  消息持久性对于可靠消息传递来讲应该是一种比较好的方法,有了消息持久化,即便发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心从新启动后仍然能够将消息发送出去,若是把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送。html

  消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,而后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动之后首先要检查制定的存储位置,若是有未发送成功的消息,则须要把消息发送出去。java

ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。mysql

一、AMQsql

AMQ是一种文件存储形式,它具备写入速度快和容易恢复的特色。消息存储在一个个文件中,文件的默认大小为32M,若是一条消息的大小超过了 32M,那么这个值必须设置大一点。当一个存储文件中的消息已经所有被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3以前的版本。默认配置以下:数据库

<persistenceAdapter>
   <amqPersistenceAdapter directory="activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

属性以下:apache

属性名称缓存

默认值服务器

描述session

directory并发

activemq-data

消息文件和日志的存储目录

useNIO

true

使用NIO协议存储消息

syncOnWrite

false

同步写到磁盘,这个选项对性能影响很是大

maxFileLength

32Mb

一个消息文件的大小

persistentIndex

true

消息索引的持久化,若是为false,那么索引保存在内存中

maxCheckpointMessageAddSize

4kb

一个事务容许的最大消息量

cleanupInterval

30000

清除操做周期,单位ms

indexBinSize

1024

索引文件缓存页面数,缺省为1024,当amq扩充或者缩减存储时,会锁定整个broker,致使必定时间的阻塞,因此这个值应该调整到比较大,可是代码中实现会动态伸缩,调整效果并不理想。

indexKeySize

96

索引key的大小,key是消息ID

indexPageSize

16kb

索引的页大小

directoryArchive

archive

存储被归档的消息文件目录

archiveDataLogs

false

当为true时,归档的消息文件被移到directoryArchive,而不是直接删除                    

二、KahaDB

KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,可是它具备强扩展性,恢复的时间比AMQ短,从5.4版本以后KahaDB作为默认的持久化方式。默认配置以下:

KahaDB的属性以下:

属性名称

默认值

描述

directory

activemq-data

消息文件和日志的存储目录

indexWriteBatchSize

1000

一批索引的大小,当要更新的索引量到达这个值时,更新到消息文件中

indexCacheSize

10000

内存中,索引的页大小

enableIndexWriteAsync

false

索引是否异步写到消息文件中

journalMaxFileLength

32mb

一个消息文件的大小

enableJournalDiskSyncs

true

是否讲非事务的消息同步写入到磁盘

cleanupInterval

30000

清除操做周期,单位ms

checkpointInterval

5000

索引写入到消息文件的周期,单位ms

ignoreMissingJournalfiles

false

忽略丢失的消息文件,false,当丢失了消息文件,启动异常

checkForCorruptJournalFiles

false

检查消息文件是否损坏,true,检查发现损坏会尝试修复

checksumJournalFiles

false

产生一个checksum,以便可以检测journal文件是否损坏。

5.4版本以后有效的属性:

   

archiveDataLogs

false

当为true时,归档的消息文件被移到directoryArchive,而不是直接删除

directoryArchive

null

存储被归档的消息文件目录

databaseLockedWaitDelay

10000

在使用负载时,等待得到文件锁的延迟时间,单位ms

maxAsyncJobs

10000

同个生产者产生等待写入的异步消息最大量

concurrentStoreAndDispatchTopics

false

当写入消息的时候,是否转发主题消息

concurrentStoreAndDispatchQueues

true

当写入消息的时候,是否转发队列消息

5.6版本以后有效的属性:

   

archiveCorruptedIndex

false

是否归档错误的索引

每一个KahaDB的实例均可以配置单独的适配器,若是没有目标队列提交给filteredKahaDB,那么意味着对全部的队列有效。若是一个队列没有对应的适配器,那么将会抛出一个异常。配置以下:

<persistenceAdapter>
  <mKahaDBdirectory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
      <!-- match all queues -->
      <filteredKahaDBqueue=">">
        <persistenceAdapter>
          <kahaDBjournalMaxFileLength="32mb"/>
        </persistenceAdapter>
      </filteredKahaDB>
       
      <!-- match all destinations -->
      <filteredKahaDB>
        <persistenceAdapter>
          <kahaDBenableJournalDiskSyncs="false"/>
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

若是filteredKahaDB的perDestination属性设置为true,那么匹配的目标队列将会获得本身对应的KahaDB实例。配置以下:

<persistenceAdapter>
  <mKahaDBdirectory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
      <!-- kahaDB per destinations -->
      <filteredKahaDB perDestination="true">
        <persistenceAdapter>
          <kahaDBjournalMaxFileLength="32mb" />
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

三、JDBC

能够将消息存储到数据库中,例如:Mysql、SQL Server、Oracle、DB2。

配置JDBC适配器:

<persistenceAdapter>
    <jdbcPersistenceAdapterdataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候建立数据表,默认值是true,这样每次启动都会去建立数据表了,通常是第一次启动的时候设置为true,以后改为false。

Mysql持久化bean:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
SQL Server持久化bean:
<bean id="mssql-ds" class="net.sourceforge.jtds.jdbcx.JtdsDataSource" destroy-method="close">
   <property name="serverName" value="SERVERNAME"/>
   <property name="portNumber" value="PORTNUMBER"/>
   <property name="databaseName" value="DATABASENAME"/>
   <property name="user" value="USER"/>
   <property name="password" value="PASSWORD"/>
</bean>
Oracle持久化bean:
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
DB2持久化bean:
<bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource"  destroy-method="close">
      <property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/>
      <property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>
      <property name="username" value="activemq"/>
      <property name="password" value="activemq"/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
  </bean>

四、LevelDB

这种文件系统是从ActiveMQ5.8以后引进的,它和KahaDB很是类似,也是基于文件的本地数据库储存形式,可是它提供比KahaDB更快的持久性。与KahaDB不一样的是,它不是使用传统的B-树来实现对日志数据的提早写,而是使用基于索引的LevelDB。

默认配置以下:

< persistenceAdapter >
       < levelDBdirectory = "activemq-data" />
</ persistenceAdapter >

 

属性名称

默认值

描述

directory

"LevelDB"

数据文件的存储目录

readThreads

10

系统容许的并发读线程数量

sync

true

同步写到磁盘

logSize

104857600 (100 MB)

日志文件大小的最大值

logWriteBufferSize

4194304 (4 MB)

日志数据写入文件系统的最大缓存值

verifyChecksums

false

是否对从文件系统中读取的数据进行校验

paranoidChecks

false

尽快对系统内部发生的存储错误进行标记

indexFactory

org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory

在建立LevelDB索引时使用

indexMaxOpenFiles

1000

可供索引使用的打开文件的数量

indexBlockRestartInterval

16

Number keys between restart points for delta encoding of keys.

indexWriteBufferSize

6291456 (6 MB)

内存中索引数据的最大值

indexBlockSize

4096 (4 K)

每一个数据块的索引数据大小

indexCacheSize

268435456 (256 MB)

使用缓存索引块容许的最大内存

indexCompression

snappy

适用于索引块的压缩类型

logCompression

none

适用于日志记录的压缩类型

属性以下:

五、  下面详细介绍一下如何将消息持久化到Mysql数据库中

Ø        须要将mysql的驱动包放置到ActiveMQ的lib目录下

Ø        修改activeMQ的配置文件:

<persistenceAdapter>
            <!--<kahaDB directory="${activemq.base}/data/kahadb"/>-->
            <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds" useDatabaseLock="false" createTablesOnStartup="false"/>
        </persistenceAdapter>

在配置文件中的broker节点外增长:

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method ="close">
       <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
       <property name="url" value="jdbc:mysql://192.168.9.65:3306/activemq?relaxAutoCommit=true"/>
       <property name="username" value="root"/>
       <property name="password" value="12345678"/>
       <property name="maxActive" value="20"/>
       <property name="poolPreparedStatements" value="true"/>
    </bean>

从配置中能够看出数据库的名称是activemq,你须要手动在MySql中增长这个库。而后从新启动activeMQ,会发现activemq多了三张表:从配置中能够看出数据库的名称是activemq,须要手动在MySql中创建这个数据库。

1:activemq_acks

2:activemq_lock

3:activemq_msgs

Ø        点到点类型

Sender类:

package com.dxz.activemq.ex2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
    private static final int SEND_NUMBER = 2000;

    public static void main(String[] args) {
        // ConnectionFactory :链接工厂,JMS用它建立链接
        ConnectionFactory connectionFactory;
        // Connection :JMS客户端到JMS Provider的链接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 构造从工厂获得链接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操做链接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 获取session,FirstQueue是一个服务器的queue
            destination = session.createQueue("FirstQueue");
            // 获得消息生成者【发送者】
            producer = session.createProducer(destination);
            // 设置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息
            sendMessage(session, producer);
            // session.commit();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    // TODO Auto-generatedcatch block
                    e.printStackTrace();
                }
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ发送消息" + i);
            System.out.println("发送消息:ActiveMQ发送的消息" + i);
            producer.send(message);
        }
    }
}
View Code

Receiver类:

package com.dxz.activemq.ex2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
    public static void main(String[] args) {
        // ConnectionFactory :链接工厂,JMS用它建立链接
        ConnectionFactory connectionFactory;
        // Connection :JMS客户端到JMS Provider的链接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 获得链接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操做链接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 建立Queue
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            while (true) {
                // 设置接收者接收消息的时间,为了便于测试,这里定为100s
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else
                    break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}
View Code

测试一:测试:

A、 先运行Sender类,待运行完毕后,运行Receiver类

B、 在此过程当中activemq数据库的activemq_msgs表中没有数据

C、 再次运行Receiver,消费不到任何信息

测试二:

A、  先运行Sender类

B、 重启电脑

C、 运行Receiver类,无任何信息被消费

测试三:

A、   把Sender类中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改成producer.setDeliveryMode(DeliveryMode.PERSISTENT);

B、   先运行Sender类,待运行完毕后,运行Receiver类

C、   在此过程当中activemq数据库的activemq_msgs表中有数据生成,运行完Receiver类后,数据清除

测试四:

A、    把Sender类中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改成producer.setDeliveryMode(DeliveryMode.PERSISTENT);

B、    运行Sender类

C、    重启电脑

D、    运行Receiver类,有消息被消费

结论:   

经过以上测试,能够发现,在P2P类型中当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中,而当 DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。并且P2P中消息一旦被Consumer消 费就从broker中删除。

Ø        发布/订阅类型

Sender类:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import  javax.jms.Connection;
import  javax.jms.ConnectionFactory;
import  javax.jms.DeliveryMode;
import  javax.jms.Destination;
import  javax.jms.JMSException;
import  javax.jms.MessageProducer;
import  javax.jms.Session;
import  javax.jms.TextMessage;
import  javax.jms.Topic;
import  org.apache.activemq.ActiveMQConnection;
import  org.apache.activemq.ActiveMQConnectionFactory;
public  class  Sender {
      private  static  final  int  SEND_NUMBER =  100  ;
      public  static  void  main(String[] args) {
         // ConnectionFactory :链接工厂,JMS用它建立链接
         ConnectionFactory connectionFactory;
         // Connection :JMS客户端到JMS Provider的链接
         Connection connection =  null  ;
          // Session:一个发送或接收消息的线程
         Session session;
         // MessageProducer:消息发送者
         MessageProducer producer;
          // TextMessage message;
          // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现
         connectionFactory =  new  ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                 ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616"  );
         try  {
             //获得链接对象
             connection = connectionFactory.createConnection();
             //启动
             connection.start();
             //获取操做链接
             session = connection.createSession(  false  , Session.AUTO_ACKNOWLEDGE);       
             Topic topic = session.createTopic(  "MQ_test"  );      
             // 获得消息生成者【发送者】
             producer = session.createProducer(topic);
             //设置持久化
             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
             //构造消息
             sendMessage(session, producer);
             //session.commit();
             connection.close();
         }
         catch  (Exception e){
             e.printStackTrace();
         }  finally  {
             if  (  null  != connection){
                try  {
                    connection.close();
                catch  (JMSException e) {
                    // TODO Auto-generatedcatch block
                    e.printStackTrace();
                }
             }   
         }
      }
      public  static  void  sendMessage(Session session, MessageProducer producer)  throws  Exception{
         for  (  int  i=  1  ; i<=SEND_NUMBER; i++){
             TextMessage message = session.createTextMessage(  "ActiveMQ发送消息"  +i);
             System.out.println(  "发送消息:ActiveMQ发送的消息"  +i);
             producer.send(message);
         }
      }
}

Receiver类:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import  javax.jms.Connection;
import  javax.jms.ConnectionFactory;
import  javax.jms.Destination;
import  javax.jms.MessageConsumer;
import  javax.jms.Session;
import  javax.jms.TextMessage;
import  javax.jms.Topic;
  
import  org.apache.activemq.ActiveMQConnection;
import  org.apache.activemq.ActiveMQConnectionFactory;
public  class  Receiver {
      public  static  void  main(String[] args) {
         // ConnectionFactory :链接工厂,JMS用它建立链接
          ConnectionFactory connectionFactory;
          // Connection :JMS客户端到JMS Provider的链接
          Connection connection =  null  ;
          // Session:一个发送或接收消息的线程
          Session session;
          // 消费者,消息接收者
          MessageConsumer consumer;
          connectionFactory = newActiveMQConnectionFactory(
                 ActiveMQConnection.DEFAULT_USER,
                  ActiveMQConnection.DEFAULT_PASSWORD,
                  "tcp://localhost:61616"  );
          try  {
              // 构造从工厂获得链接对象
              connection =connectionFactory.createConnection();
             
              connection.setClientID(  "clientID001"  );
              // 启动
              connection.start();
              // 获取操做链接
              session = connection.createSession(  false  ,
                      Session.AUTO_ACKNOWLEDGE);
              // 获取session
             Topic topic = session.createTopic(  "MQ_test"  );      
             // 获得消息生成者【发送者】
             consumer = session.createDurableSubscriber(topic,  "MQ_sub"  );
            
              while  (  true  ){
                //设置接收者接收消息的时间,为了便于测试,这里谁定为100s
                TextMessagemessage = (TextMessage)consumer.receive(  100000  );
                if  (  null  != message){
                   System.out.println(  "收到消息"  +message.getText());
                }  else  break  ;
              }
          }  catch  (Exception e){
          e.printStackTrace();
          }  finally  {
              try  {
                  if  (  null  != connection)
                      connection.close();
              catch  (Throwable ignore) {
              }
          }
      }
  
}

测试:

测试一:

A、先启动Sender类

B、再启动Receiver类

C、结果无任何记录被订阅

测试二:

A、先启动Receiver类,让Receiver在相关主题上进行订阅

B、中止Receiver类,再启动Sender类

C、待Sender类运行完成后,再启动Receiver类

D、结果发现相应主题的信息被订阅

相关文章
相关标签/搜索