非持久的Topic消息示例
服务器
对于非持久的Topic消息的发送
基本跟前面发送队列信息是同样的,只是把建立Destination的地方,由建立队列替换成建立Topic,例如:session
Destination destination = session.createTopic("MyTopic");
对于非持久的Topic消息的接收
1:必需要接收方在线,而后客户端再发送信息,接收方才能接收到消息
2:一样把建立Destination的地方,由建立队列替换成建立Topic,例如:tcp
Destination destination = session.createTopic("MyTopic");
3:因为不知道客户端发送多少信息,所以改为while循环的方式了,例如:spa
Message message = consumer.receive(); while(message!=null) { TextMessage txtMsg = (TextMessage)message; System.out.println("收到消 息:" + txtMsg.getText()); message = consumer.receive(1000L); }
消息的生产者:线程
public class NoPersistenceSender { //默认链接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认链接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认链接地址 private static final String BROKEURL = "tcp://192.168.0.129:61616"; //发送的消息数量 private static final int SENDNUM = 10; public static void main(String[] args) { //链接工厂 ConnectionFactory connectionFactory; //链接 Connection connection = null; //会话 接受或者发送消息的线程 Session session; //消息的目的地 Destination destination; //消息生产者 MessageProducer messageProducer; //实例化链接工厂(链接到ActiveMQ服务器) connectionFactory = new ActiveMQConnectionFactory(NoPersistenceSender.USERNAME, NoPersistenceSender.PASSWORD, NoPersistenceSender.BROKEURL); try { //经过链接工厂获取链接 connection = connectionFactory.createConnection(); //启动链接 connection.start(); //建立session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //建立一个名称为MyTopic的消息队列(生产者生成的消息放在哪) destination = session.createTopic("MyTopic"); //建立消息生产者 messageProducer = session.createProducer(destination); //发送消息 sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 * * @param session * @param messageProducer 消息生产者 * @throws Exception */ public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception { for (int i = 0; i < NoPersistenceSender.SENDNUM; i++) { //建立一条文本消息 TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i); System.out.println("发送消息:Activemq 发送消息" + i); //经过消息生产者发出消息 messageProducer.send(message); } } }
消息的消费者:code
public class NoPersistenceReceiver { //默认链接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认链接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认链接地址 private static final String BROKEURL = "tcp://192.168.0.129:61616"; public static void main(String[] args) { ConnectionFactory connectionFactory;//链接工厂 Connection connection = null;//链接 Session session;//会话 接受或者发送消息的线程 Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息的消费者 //实例化链接工厂(链接到ActiveMQ服务器) connectionFactory = new ActiveMQConnectionFactory(NoPersistenceReceiver.USERNAME, NoPersistenceReceiver.PASSWORD, NoPersistenceReceiver.BROKEURL); try { //经过链接工厂获取链接 connection = connectionFactory.createConnection(); //启动链接 connection.start(); //建立session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //生产者将消息发送到MyTopic,因此消费者要到MyTopic去取 destination = session.createTopic("MyTopic"); //建立消息消费者 messageConsumer = session.createConsumer(destination); //Message message = messageConsumer.receive(); //while (message != null) { //TextMessage txtMsg = (TextMessage) message; //System.out.println("收到消息:" + txtMsg.getText()); //message = messageConsumer.receive(1000L); //session.commit(); //} Message message = messageConsumer.receive(); while (message != null) { TextMessage txtMsg = (TextMessage) message; System.out.println("收到消 息:" + txtMsg.getText()); //没这句有错 message = messageConsumer.receive(1000L); } session.commit(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
首先运行运行生产者(消费者处于费运行状态),而后运行消费者:中间件
此时再次运行一下生产者(消费者处于开启状态)blog
结论:必需要接收方在线,而后客户端再发送信息,接收方才能接收到消息
队列
持久的Topic消息示例
get
对于持久的Topic消息的发送
1:要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在链接以前设定
2:必定要设置完成后,再start 这个 connection
对于持久的Topic消息的接收
1:须要在链接上设置消费者id,用来识别消费者
2:须要建立TopicSubscriber来订阅
3:要设置好了事后再start 这个 connection
4:必定要先运行一次,等于向消息服务中间件注册这个消费者,而后再运行客户端发送信息,这个时候,
不管消费者是否在线,都会接收到,不在线的话,下次链接的时候,会把没有收过的消息都接收下来
生产者:
public class PersistenceSender { //默认链接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认链接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认链接地址 private static final String BROKEURL = "tcp://192.168.0.129:61616"; //发送的消息数量 private static final int SENDNUM = 10; public static void main(String[] args) { //链接工厂 ConnectionFactory connectionFactory; //链接 Connection connection = null; //会话 接受或者发送消息的线程 Session session; //消息的目的地 Destination destination; //消息生产者 MessageProducer messageProducer; //实例化链接工厂(链接到ActiveMQ服务器) connectionFactory = new ActiveMQConnectionFactory(PersistenceSender.USERNAME, PersistenceSender.PASSWORD, PersistenceSender.BROKEURL); try { //经过链接工厂获取链接 connection = connectionFactory.createConnection(); //建立session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //建立一个名称为MyTopic的消息队列(生产者生成的消息放在哪) destination = session.createTopic("MyTopic1"); //建立消息生产者 messageProducer = session.createProducer(destination); messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); //启动链接 connection.start(); //发送消息 sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 * * @param session * @param messageProducer 消息生产者 * @throws Exception */ public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception { for (int i = 0; i < PersistenceSender.SENDNUM; i++) { //建立一条文本消息 TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i); System.out.println("发送消息:Activemq 发送消息" + i); //经过消息生产者发出消息 messageProducer.send(message); } } }
消费者:
public class PersistenceReceiver { //默认链接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认链接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认链接地址 private static final String BROKEURL = "tcp://192.168.0.129:61616"; public static void main(String[] args) { ConnectionFactory connectionFactory;//链接工厂 Connection connection = null;//链接 Session session;//会话 接受或者发送消息的线程 Topic topic;//消息的目的地 //实例化链接工厂(链接到ActiveMQ服务器) connectionFactory = new ActiveMQConnectionFactory(PersistenceReceiver.USERNAME, PersistenceReceiver.PASSWORD, PersistenceReceiver.BROKEURL); try { //经过链接工厂获取链接 connection = connectionFactory.createConnection(); connection.setClientID("winner_0715"); //建立session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //生产者将消息发送到MyTopic,因此消费者要到MyTopic去取 topic = session.createTopic("MyTopic1"); //建立消息消费者 TopicSubscriber consumer = session.createDurableSubscriber(topic, "t1"); //启动链接 connection.start(); Message message = consumer.receive(); while (message != null) { TextMessage txtMsg = (TextMessage) message; System.out.println("收到消 息:" + txtMsg.getText()); //没这句有错 message = consumer.receive(1000L); } session.commit(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
消费者须要先运行一次,注册~
由于是持久消息,因此还会有别的订阅者,因此是0
关于持久化和非持久化消息
持久化消息
这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成 功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另外一个重要方面是确保持久性消
息传送至目标后,消息服务在向消费者传送它们以前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。若是消息服务因为某种缘由致使失败,它能够恢复此消息并将此消息传送至相应的消费者。虽然这样增长了消息传送的开销,但却增长了可靠性。
非持久化消息
保证这些消息最多被传送一次。对于这些消息,可靠性并不是主要的考虑因素。 此模式并不要求持久性的数据存储,也不保证消息服务因为某种缘由致使失败后消息不会丢失。 有两种方法指定传送模式:
1.使用setDeliveryMode 方法,这样全部的消息都采用此传送模式; 如:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法为每一条消息设置传送模式