ActiveMQ


JMS是J2EE体系标准的一部分,规定了应用之间同步、异步进行消息发送、接受的相关规范。ActiveMQ是实现了JMS标准的消息服务器。html


ActiveMQ的几个重要概念:
java

1、client:mysql

    ActiveMQ的client包括producer和consumer两类,从名字可看出,producer是产生消息的消息生产者,consumer是接收消息的消息消费者,web

现实中producer和consumer是两个应用程序,它们之间经过ActiveMQ进行通讯。sql

消息生产者:消息生产者是由会话建立的一个对象,用于把消息发送到一个目的地。
消息消费者:消息消费者是由会话建立的一个对象,它用于接收发送到目的地的消息。
数据库

消息的消费能够采用如下两种方法之一:
同步消费。经过调用消费者的receive方法从目的地中显式提取消息。receive方法能够一直阻塞到消息到达。
异步消费。客户能够为消费者注册一个消息监听器,以定义在消息到达时所采起的动做。
apache


JMS消息只有在被确认以后,才认为已经被成功地消费了。服务器

消息的成功消费一般包含三个阶段:客户接收消息、客户处理消息和消息被确认。
在事务性会话中,当一个事务被提交的时候,确认自动发生。
oracle


在非事务性会话中,消息什么时候被确认取决于建立会话时的应答模式(acknowledgement mode)。异步

该参数有如下三个可选值:
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE。客户经过消息的acknowledge方法确认消息。须要注意的是,在这种模式中,确认是在会话层上进行:

确认一个被消费的消息将自动确认全部已被会话消费的消息。例如,若是一个消息消费者消费了10个消息,而后确认第5个消息,那么全部10个消息都被确认。
Session.DUPS_ACKNOWLEDGE。JMS provider失败,那么可能会致使一些重复的消息。

若是是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。


2、destination:发送消息的目标,接收消息的来源。

消息分为Queue和Topic两种:

Queue是点对点消费,发送者发送一条消息,只有一个且惟一的一个消费者能对其进行消费。

Topic是订阅式消费,一个消息能够被不少的订阅者消费,其中定阅者又分为持久化订阅和非持久化订阅。

持久化订阅是指即便订阅者当前不在线,其订阅以后,发送方发到Broker的消息,也会在持久化订阅者再次上线的时候完成消费,不会丢失消息。

而非持久化订阅者,只有订阅者在线时才会消费,不在线时,即便Broker收到新的消息,当其再次上线时,也不会收到错过的消息。


3、持久化:

    ActiveMQ支持持久化,能够将接收到的消息保存到数据库中,就算ActiveMQ重启,也照样能将还没有派发的消息发送出去。

    ActiveMQ的持久化机制,对于Queue类型的消息,将存储在Broker,可是一旦其中一个消费者完成消费,则当即删除这条消息。

对于Topic类型的消息,即便全部的订阅者都完成了消费,Broker也不必定会立刻删除无用消息,而是保留推送历史,以后会异步清除无用消息。

而每一个订阅者消费到了哪条消息的offset会记录在Broker,以避免下次重复消费。由于消息是顺序消费,先进先出,因此只须要记录上次消息消费到哪里就能够了。

配置持久化的方式,都是修改%ACTIVEMQ_HOME%conf/acticvemq.xml文件。

ActiveMQ持久化消息的三种方式:
介绍三种方式,分别是持久化为文件,MYSql,Oracle。
A:持久化为文件
这个你装ActiveMQ时默认就是这种,只要你设置消息为持久化就能够了。涉及到的配置和代码有

<persistenceAdapter>
    <kahaDB directory="$ {activemq.base}/data/kahadb"/>
</persistenceAdapter>
 
producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);

B:持久化为MySql
    你首先须要把MySql的驱动放到ActiveMQ的Lib目录下,如mysql-connector-java-5.0.4-bin.jar, 若是用到其余的三方包,像c3p0(开源的JDBC链接池,它实现了数据源和JNDI绑定),须要把对应的包放进来。
接下来你修改配置文件 

<bean>
   <persistenceAdapter>
       <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
  </persistenceAdapter>
 
   <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
   <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
   <property name="username" value="activemq"/>
   <property name="password" value="activemq"/>
   <property name="maxActive" value="200"/>
   <property name="poolPreparedStatements" value="true"/>
</bean>

从配置中能够看出数据库的名称是activemq,你须要手动在MySql中增长这个库。
 
而后从新启动消息队列,你会发现多了3张表
 
1:activemq_acks用于存储订阅关系。若是是持久化Topic,订阅者和服务器的订阅关系在这个表保存。

主要的数据库字段以下:

CONTAINER:消息的Destination

SUB_DEST:若是是使用Static集群,这个字段会有集群其余系统的信息

CLIENT_ID:每一个订阅者都必须有一个惟一的客户端ID用以区分

SUB_NAME:订阅者名称

SELECTOR:选择器,能够选择只消费知足条件的消息。条件能够用自定义属性实现,可支持多属性AND和OR操做

LAST_ACKED_ID:记录消费过的消息的ID。
2: activemq_lock在集群环境中才有用,只有一个Broker能够得到消息,称为Master Broker,其余的只能做为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪一个Broker是当前的Master Broker。

配置以下:

<beans>
    <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds"/> 
        </persistenceAdapter>
    </broker>
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> 
        <property name="username" value="activemq"/> 
        <property name="password" value="activemq"/> 
        <property name="maxActive" value="200"/> 
        <property name="poolPreparedStatements" value="true"/> 
    </bean>
</beans>

首先定义一个mysql-ds的MySQL数据源,而后在persistenceAdapter节点中配置jdbcPersistenceAdapter而且引用刚才定义的数据源。

3:activemq_msgs用于存储消息,Queue和Topic都存储在这个表中。

主要的数据库字段以下:

ID:自增的数据库主键

CONTAINER:消息的Destination

MSGID_PROD:消息发送者客户端的主键

MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ能够组成JMS的MessageID

EXPIRATION:消息的过时时间,存储的是从1970-01-01到如今的毫秒数

MSG:消息本体的Java序列化对象的二进制数据

PRIORITY:优先级,从0-9,数值越大优先级越高

activemq_acks用于存储订阅关系。若是是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
C:持久化为Oracle, 和持久化为MySql同样。需注意两点

1,在ActiveMQ安装文件夹里的Lib文件夹中增长Oracle的JDBC驱动。驱动文件位于Oracle客户端安装文件中的product\11.1.0\client_1\jdbc\lib文件夹下。
2,
<span style="font-size:12px;"><bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
   <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
   <property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:cmfudv1"/>
   <property name="username" value="qdcommu"/>
   <property name="password" value="qdcommu"/>
   <property name="maxActive" value="200"/>
   <property name="poolPreparedStatements" value="true"/>
</bean></span>
这里的jdbc:oracle:thin:@10.53.132.47:1521:cmfudv1按照本身实际状况设置一下就能够了,特别注意的是cmfudv1是SID即服务名称而不是TNS中配置的节点名。只须要替换IP,端口和这个SID就能够了。

4、异步:

    ActiveMQ最大的特色就是异步,这也是和webservice最大的差异,发送者只需将消息发送给ActiveMQ,剩下的事交给ActiveMQ就行,发送者不用关心。

   

本地事务
      在一个JMS客户端,可使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的全部消息被发送,消费的全部消息被确认;事务回滚意味着生产的全部消息被销毁,消费的全部消息被恢复并从新提交,除非它们已通过期。
事务性的会话老是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另外一个事务被开始。

关闭事务性会话将回滚其中的事务。
须要注意的是:

1. 若是使用请求/回复机制,即发送一个消息,同时但愿在同一个事务中等待接收该消息的回复,那么程序将被挂起,由于知道事务提交,发送操做才会真正执行。 2. 须要注意的还有一个,消息的生产和消费不能包含在同一个事务中。