ActiveMQ支持的虚拟Destinations分为有两种,分别是
1.虚拟主题(Virtual Topics)
2.组合 Destinations(CompositeDestinations)java
这两种虚拟Destinations能够看作对简单的topic和queue用法的补充,基于它们能够实现一些简单有用的EIP功能,虚拟主题相似于1对多的分支功能+消费端的cluster+failover,组合Destinations相似于简单的destinations直接的路由功能。web
虚拟主题(Virtual Topics)
ActiveMQ中,topic只有在持久订阅(durablesubscription)下是持久化的。存在持久订阅时,每一个持久订阅者,都至关于一个持久化的queue的客户端,它会收取全部消息。这种状况下存在两个问题:
1.同一应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。由于每一个都会获取全部消息。queue模式能够解决这个问题,broker端又不能将消息发送到多个应用端。因此,既要发布订阅,又要让消费者分组,这个功能jms规范自己是没有的。
2.同一应用内consumer端failover的问题:因为只能使用单个的持久订阅者,若是这个订阅者出错,则应用就没法处理消息了,系统的健壮性不高。
为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能。使用起来很是简单。
对于消息发布者来讲,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。
对于消息接收端来讲,是个队列,不一样应用里使用不一样的前缀做为队列的名称,便可代表本身的身份便可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。能够在同一个应用里使用多个consumer消费此queue,则能够实现上面两个功能。又由于不一样应用使用的queue名称不一样(前缀不一样),因此不一样的应用中均可以接收到所有的消息。每一个客户端至关于一个持久订阅者,并且这个客户端可使用多个消费者共同来承担消费任务。apache
生产者:
服务器
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) throws JMSException { // 链接到ActiveMQ服务器 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.67:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立主题 Topic topic = session.createTopic("VirtualTopic.TEST"); MessageProducer producer = session.createProducer(topic); // NON_PERSISTENT 非持久化 PERSISTENT 持久化,发送消息时用使用持久模式 producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage(); message.setText("topic 消息。"); message.setStringProperty("property", "消息Property"); // 发布主题消息 producer.send(message); System.out.println("Sent message: " + message.getText()); session.close(); connection.close(); } }
消费者:session
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { public static void main(String[] args) throws JMSException, InterruptedException { // 链接到ActiveMQ服务器 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.67:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立主题 Queue topicA = session.createQueue("Consumer.A.VirtualTopic.TEST"); Queue topicB = session.createQueue("Consumer.B.VirtualTopic.TEST"); // 消费者A组建立订阅 MessageConsumer consumerA1 = session.createConsumer(topicA); consumerA1.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message A1: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } }); MessageConsumer consumerA2 = session.createConsumer(topicA); consumerA2.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message A2: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } }); //消费者B组建立订阅 MessageConsumer consumerB1 = session.createConsumer(topicB); consumerB1.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message B1: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } }); MessageConsumer consumerB2 = session.createConsumer(topicB); consumerB2.setMessageListener(new MessageListener() { // 订阅接收方法 public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message B2: " + tm.getText()+":"+tm.getStringProperty("property")); } catch (JMSException e) { e.printStackTrace(); } } }); session.close(); connection.close(); } }
组合列队Composite Destinations负载均衡
组合队列容许用一个虚拟的destination表明多个destinations。这样就能够经过composite destinations在一个操做中同时向多个queue发送消息。异步
客户端实现的方式tcp
在composite destinations中,多个destination之间采用“,”分割。例如:fetch
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C"); 或 Destination destination = session.createQueue("my-queue,my-queue2");
若是你但愿使用不一样类型的destination,那么须要加上前缀如queue:// 或topic://,例如:spa
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
在conf/activemq.xml中的broker下配置实现
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="MY.QUEUE"> <forwardTo> <queue physicalName="my-queue" /> <queue physicalName="my-queue2" /> </forwardTo> </compositeQueue> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
再java代码发送的时候,队列的的名字就用MY.QUEUQ
Configure Startup Destinations
若是须要在ActiveMQ启动的时候,建立Destination的话,能够以下配置conf/activemq.xml的broker下:
<destinations> <queue physicalName="FOO.BAR" /> <topic physicalName="SOME.TOPIC" /> </destinations>
Delete Inactive Destinations
通常状况下,ActiveMQ的queue在不使用以后,能够经过web控制台或是JMX方式来删除掉。固然,也能够经过配置,使得broker能够自动探测到无用
的队列(必定时间内为空的队列)并删除掉,回收响应资源。能够以下配置conf/activemq.xml:
<broker schedulePeriodForDestinationPurge="10000"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/> </policyEntries> </policyMap> </destinationPolicy> </broker>
说明:
schedulePeriodForDestinationPurge:设置多长时间检查一次,这里是10秒,默认为0
inactiveTimoutBeforeGC:设置当Destination为空后,多长时间被删除,这里是30秒,默认为60
gcInactiveDestinations: 设置删除掉不活动队列,默认为false
Destination Options
队列选项是给consumer在JMS规范以外添加的功能特性,经过在队列名称后面使用相似URL的语法添加多个选项。包括:
1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable
2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的状况下,丢弃的数量,默认0
3:consumer.noLocal :默认false
4:consumer.dispatchAsync :是否异步分发 ,默认true
5:consumer.retroactive:是否为回溯消费者 ,默认false
6:consumer.selector:Jms的Selector,默认null
7:consumer.exclusive:是否为独占消费者 ,默认false
8:consumer.priority:设置消费者的优先级,默认0
使用示例:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10"); consumer = session.createConsumer(queue);