import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class NonPersistenceSender { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("MyTopic"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message111--" + i); producer.send(message); } session.commit(); session.close(); connection.close(); } }
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ProducerInfo; import javax.jms.*; import java.io.IOException; public class NoPersistenceReceiver { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("ActiveMQ.Advisory.Producer.Topic.MyTopic"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(); while (message != null) { // TextMessage txtMsg = (TextMessage) message; // System.out.println("收到消息:" + txtMsg.getText()); // message = consumer.receive(1000L); if (message instanceof ActiveMQMessage) { try { ActiveMQMessage aMsg = (ActiveMQMessage) message; ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure(); System.out.println("count===" + aMsg.getProperty("producerCount")); System.out.println(" prodd===" + prod.getProducerId()); } catch (IOException e) { e.printStackTrace(); } } } session.commit(); session.close(); connection.close(); } }
测试:
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ScheduledMessage; import javax.jms.*; public class NonPersistenceSender { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("MyTopic"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message111--" + i); long delay = 3 * 1000; long period = 3 * 1000; int repeat = 5; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message); } session.commit(); session.close(); connection.close(); } }
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class NoPersistenceReceiver { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("MyTopic"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(); while (message != null) { TextMessage txtMsg = (TextMessage) message; System.out.println("收到消息:" + txtMsg.getText()); message = consumer.receive(); } session.commit(); session.close(); connection.close(); } }
先启动消费者,后启动生产者。
注意:如果message = consumer.receive(1000L);消费者接收一次数据之后,就会关闭。那么,period和repeat就不起作用了。