这里的批次,就好比是分页。
测试:
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class NonPersistenceSender { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("MyTopic"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message111--" + i); producer.send(message); } session.commit(); session.close(); connection.close(); } }
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class NoPerReceiver2 { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("MyTopic"); for (int i = 0; i < 2; i++) { final MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage txtMsg = (TextMessage) message; try { System.out.println(consumer + "收到消息:" + txtMsg.getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); } } }
先启动消费者,后启动生产者。
可以看出,多个消费者消费同一个生产者的消息时,是按照顺序消费的。
修改一下消费者
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class NoPerReceiver2 { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); for (int i = 0; i < 2; i++) { Thread t = new MyT(factory); t.start(); } } } class MyT extends Thread { private ConnectionFactory connectionFactory = null; public MyT(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } @Override public void run() { try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("MyTopic"); for (int i = 0; i < 2; i++) { final MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage txtMsg = (TextMessage) message; try { System.out.println(consumer + "收到消息:" + txtMsg.getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); } } catch (JMSException e) { e.printStackTrace(); } } }
先运行消费者,后运行生产者
这就演示了不同的topic consumer可能会以不同的顺序接收来自不同producer的消息。