ActiveMQ--(pub/sub)模型

发布订阅模式是一对多的方式。就是消息提供者提供一个Topic,可以由多个消息消费者订阅。

下图是发布订阅模式的示意图:

        订阅模型可以分为非持久订阅和持久订阅。当所有的消息必须接受的时候,则需要用到持久订阅,反之,则用非持久订阅 

非持久订阅:

        消费者只能消费提供者在订阅操作之后时提供的消息。也就是说消费者启动后,提供者再提供消息,消费者能接受此时发送的消息,但之前提供的消息不能订阅。

消息消费者及提供者设置:

Destination destination = session.createTopic("first-topic");

 以下案例:

设置两个消费者:JmsTopicReceiver JmsTopicReceiver2

设置一个提供者:JmsTopicSender

先启动两个消费者,此时处于阻塞状态,再启动消息提供者,两消费者接受消息。

消息提供者(在消费者启动后再运行)

public class JmsTopicSender {
    public static void main(String[] args) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.70.66:61616");
        Connection connection=null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("first-topic");
            MessageProducer producer = session.createProducer(destination);
            TextMessage textMessage = session.createTextMessage("今天天气很好啊");
            producer.send(textMessage);
            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if (connection!=null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

 

消息消费者1:

 

 

public class JmsTopicReceiver {
    public static void main(String[] args) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.70.66:61616");
        Connection connection=null;
        try {
            connection=connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
             Destination destination= session.createTopic("first-topic");
            MessageConsumer consumer = session.createConsumer(destination);
            TextMessage textMessage=   (TextMessage)consumer.receive();
            System.out.println("消费者1:"+textMessage.getText());
            session.commit();
            session.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if (connection!=null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

 消息消费者2

public class JmsTopicReceiver2 {
    public static void main(String[] args) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.70.66:61616");
        Connection connection=null;
        try {
            connection=connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
             Destination destination= session.createTopic("first-topic");
            MessageConsumer consumer = session.createConsumer(destination);
            TextMessage textMessage=   (TextMessage)consumer.receive();
            System.out.println("消费者2:"+textMessage.getText());
            session.commit();
            session.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if (connection!=null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

运行结果为:消费者1:今天天气很好啊       消费者1:今天天气很好啊

 

持久订阅:

当所有消息都需接受时,采用永久订阅。

运行流程:

(1)先运行消费者,注册clientId,然后关闭

(2)运行消息提供方,提供消息

(3)最后运行消费者,此时能订阅消息

说明此时消费者能订阅之前的消息

主要在客户端配置:

设置消费端id

connection.setClientID("DUBBO-ORDER");

生成topic

Topic topic = session.createTopic("first-topic");

消费者设置为永久订阅  (注意参数值,与上面两值一致)

MessageConsumer consumer = session.createDurableSubscriber(topic, "DUBBO-ORDER");
public class JmsTopicPersistenteReceiver {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.70.66:61616");
        Connection connection=null;
        try {
            connection= connectionFactory.createConnection();
            //持久订阅设置ID
            connection.setClientID("DUBBO-ORDER");
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("first-topic");
            //MessageConsumer consumer = session.createConsumer(destination);
            //创建持久订阅,只能接受具体的topic
            MessageConsumer consumer = session.createDurableSubscriber(topic, "DUBBO-ORDER");
            TextMessage textMessage= (TextMessage)consumer.receive();
            System.out.println(textMessage);
            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if (connection!=null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}