测试:
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageProducer; import javax.jms.*; public class QueueGroupSender { public static void main(String[] args) throws JMSException, InterruptedException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message---" + i); message.setStringProperty("JMSXGroupID", "GroupA"); producer.send(message); } session.commit(); session.close(); connection.close(); } }
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueGroupReceiver { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); for (int i = 0; i < 2; i++) { final MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage txtMsg = (TextMessage) message; try { System.out.println(consumer + "收到消息:" + txtMsg.getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); } } }
先运行消费者,后运行生产者。
虽然,有两个消费者线程。但是,3条消息都让一个消费者线程消费掉了。
增加一个group
继续运行生产者。
两个消费者线程,别接收各自组的消息。