修改 activemq.xml 配置文件java
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="zk_cluster_nodes" dataDirectory="${activemq.data}" schedulerSupport="true">
该配置的 brokerName 必须一致。node
将如下配置spring
<persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb" /> </persistenceAdapter>
改成:apache
<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="192.168.1.2:2181,192.168.1.3:2181,192.168.1.4:2181" zkPassword="" hostname="192.168.10.2" sync="local_disk" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>
依次启动3个节点,若出现错误:浏览器
INFO | No IOExceptionHandler registered, ignoring IO exception java.io.IOException: com.google.common.base.Objects.firstNonNull(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object; at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:39)[activemq-client-5.10.0.jar:5.10.0] at org.apache.activemq.leveldb.LevelDBClient.might_fail(LevelDBClient.scala:552)[activemq-leveldb-store-5.10.0.jar:5.10.0] at org.apache.activemq.leveldb.LevelDBClient.replay_init(LevelDBClient.scala:657)[activemq-leveldb-store-5.10.0.jar:5.10.0] at org.apache.activemq.leveldb.LevelDBClient.start(LevelDBClient.scala:558)[activemq-leveldb-store-5.10.0.jar:5.10.0] at org.apache.activemq.leveldb.DBManager.start(DBManager.scala:648)[activemq-leveldb-store-5.10.0.jar:5.10.0] at org.apache.activemq.leveldb.LevelDBStore.doStart(LevelDBStore.scala:235)[activemq-leveldb-store-5.10.0.jar:5.10.0] at org.apache.activemq.leveldb.replicated.MasterLevelDBStore.doStart(MasterLevelDBStore.scala:110)[activemq-leveldb-store-5.10.0.jar:5.10.0] at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)[activemq-client-5.10.0.jar:5.10.0] at org.apache.activemq.leveldb.replicated.ElectingLevelDBStore$$anonfun$start_master$1.apply$mcV$sp(ElectingLevelDBStore.scala:226)[activemq-leveldb-store-5.10.0.jar:5.10.0] at org.fusesource.hawtdispatch.package$$anon$4.run(hawtdispatch.scala:330)[hawtdispatch-scala-2.11-1.21.jar:1.21] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)[:1.7.0_79] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)[:1.7.0_79] at java.lang.Thread.run(Thread.java:745)[:1.7.0_79] INFO | Stopped LevelDB[/home/local/activemq-5.10.0-3/data/leveldb]
先删除 activemq 中 lib 目录下 pax-url-aether-1.5.2.jar 文件,并将 activemq.xml 中该配置注释服务器
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> </bean>
3. 配置 springsession
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://192.168.1.2:61616,tcp://192.168.1.3:61626,tcp://192.168.1.4:61636)"></property> <property name="redeliveryPolicy"> <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <property name="maximumRedeliveries" value="5"/> <property name="initialRedeliveryDelay" value="100"/> </bean> </property> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="pubSubDomain" value="false"/> </bean>
4. 测试发送消息app
ApplicationContext app = new ClassPathXmlApplicationContext("broker.xml"); String queueName = "UUU9"; final String message = "bbbb"; JmsTemplate jmsTemplate = app.getBean(JmsTemplate.class); jmsTemplate.setDeliveryMode(1); for (int i = 0; i < 10; i++) { jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 6000L); return textMessage; } }); Thread.sleep(200L); }
而后,在浏览器地址栏里输入:负载均衡
http://192.168.1.2:8161/admin/queues.jsp
jsp
http://192.168.1.3:8161/admin/queues.jsp
http://192.168.1.4:8161/admin/queues.jsp
由于使用zookeeper作负载均衡,三台只有一台是master,其余两台处于等待状态,因此只有其中一台提供服务,但一旦这台服务器宕机之后,会有另一台顶替上来,因此其余几个ip地址是打不开的,只有一台能打开