ActiveMQ使用示例之Topic

 非持久的Topic消息示例
服务器

对于非持久的Topic消息的发送
基本跟前面发送队列信息是同样的,只是把建立Destination的地方,由建立队列替换成建立Topic,例如:session

Destination destination = session.createTopic("MyTopic");

对于非持久的Topic消息的接收
1:必需要接收方在线,而后客户端再发送信息,接收方才能接收到消息
2:一样把建立Destination的地方,由建立队列替换成建立Topic,例如:tcp

Destination destination = session.createTopic("MyTopic");

3:因为不知道客户端发送多少信息,所以改为while循环的方式了,例如:spa

Message message = consumer.receive(); while(message!=null) {   TextMessage txtMsg = (TextMessage)message;   System.out.println("收到消 息:" + txtMsg.getText());   message = consumer.receive(1000L); }

消息的生产者:线程

public class NoPersistenceSender {

    //默认链接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认链接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认链接地址
    private static final String BROKEURL = "tcp://192.168.0.129:61616";
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //链接工厂
        ConnectionFactory connectionFactory;
        //链接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化链接工厂(链接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceSender.USERNAME,
                NoPersistenceSender.PASSWORD, NoPersistenceSender.BROKEURL);

        try {
            //经过链接工厂获取链接
            connection = connectionFactory.createConnection();
            //启动链接
            connection.start();
            //建立session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //建立一个名称为MyTopic的消息队列(生产者生成的消息放在哪)
            destination = session.createTopic("MyTopic");
            //建立消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /**
     * 发送消息
     *
     * @param session
     * @param messageProducer 消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < NoPersistenceSender.SENDNUM; i++) {
            //建立一条文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
            System.out.println("发送消息:Activemq 发送消息" + i);
            //经过消息生产者发出消息
            messageProducer.send(message);
        }

    }
}

消息的消费者:code

public class NoPersistenceReceiver {

    //默认链接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认链接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认链接地址
    private static final String BROKEURL = "tcp://192.168.0.129:61616";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//链接工厂
        Connection connection = null;//链接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //实例化链接工厂(链接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(NoPersistenceReceiver.USERNAME,
                NoPersistenceReceiver.PASSWORD, NoPersistenceReceiver.BROKEURL);

        try {
            //经过链接工厂获取链接
            connection = connectionFactory.createConnection();
            //启动链接
            connection.start();
            //建立session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //生产者将消息发送到MyTopic,因此消费者要到MyTopic去取
            destination = session.createTopic("MyTopic");
            //建立消息消费者
            messageConsumer = session.createConsumer(destination);

            //Message message = messageConsumer.receive();
            //while (message != null) {
                //TextMessage txtMsg = (TextMessage) message;
                //System.out.println("收到消息:" + txtMsg.getText());
                //message = messageConsumer.receive(1000L);
                //session.commit();
            //}

            Message message = messageConsumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消 息:" + txtMsg.getText());
                //没这句有错
                message = messageConsumer.receive(1000L);
            }
            session.commit();
            session.close();
            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

首先运行运行生产者(消费者处于费运行状态),而后运行消费者:中间件

 此时再次运行一下生产者(消费者处于开启状态)blog

 结论:必需要接收方在线,而后客户端再发送信息,接收方才能接收到消息
队列

 持久的Topic消息示例
get

 对于持久的Topic消息的发送

1:要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在链接以前设定
2:必定要设置完成后,再start 这个 connection
 

对于持久的Topic消息的接收

1:须要在链接上设置消费者id,用来识别消费者
2:须要建立TopicSubscriber来订阅
3:要设置好了事后再start 这个 connection
4:必定要先运行一次,等于向消息服务中间件注册这个消费者,而后再运行客户端发送信息,这个时候,
不管消费者是否在线,都会接收到,不在线的话,下次链接的时候,会把没有收过的消息都接收下来

生产者:

public class PersistenceSender {

    //默认链接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认链接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认链接地址
    private static final String BROKEURL = "tcp://192.168.0.129:61616";
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //链接工厂
        ConnectionFactory connectionFactory;
        //链接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化链接工厂(链接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(PersistenceSender.USERNAME,
                PersistenceSender.PASSWORD, PersistenceSender.BROKEURL);

        try {
            //经过链接工厂获取链接
            connection = connectionFactory.createConnection();
            //建立session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //建立一个名称为MyTopic的消息队列(生产者生成的消息放在哪)
            destination = session.createTopic("MyTopic1");
            //建立消息生产者
            messageProducer = session.createProducer(destination);
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //启动链接
            connection.start();
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 发送消息
     *
     * @param session
     * @param messageProducer 消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < PersistenceSender.SENDNUM; i++) {
            //建立一条文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
            System.out.println("发送消息:Activemq 发送消息" + i);
            //经过消息生产者发出消息
            messageProducer.send(message);
        }
    }
}

消费者:

public class PersistenceReceiver {

    //默认链接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认链接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认链接地址
    private static final String BROKEURL = "tcp://192.168.0.129:61616";

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//链接工厂
        Connection connection = null;//链接

        Session session;//会话 接受或者发送消息的线程
        Topic topic;//消息的目的地

        //实例化链接工厂(链接到ActiveMQ服务器)
        connectionFactory = new ActiveMQConnectionFactory(PersistenceReceiver.USERNAME,
                PersistenceReceiver.PASSWORD, PersistenceReceiver.BROKEURL);

        try {
            //经过链接工厂获取链接
            connection = connectionFactory.createConnection();
            connection.setClientID("winner_0715");
            //建立session
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //生产者将消息发送到MyTopic,因此消费者要到MyTopic去取
            topic = session.createTopic("MyTopic1");
            //建立消息消费者
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "t1");

            //启动链接
            connection.start();

            Message message = consumer.receive();
            while (message != null) {
                TextMessage txtMsg = (TextMessage) message;
                System.out.println("收到消 息:" + txtMsg.getText());
                //没这句有错
                message = consumer.receive(1000L);
            }
            session.commit();
            session.close();
            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

}

消费者须要先运行一次,注册~

 

由于是持久消息,因此还会有别的订阅者,因此是0

关于持久化和非持久化消息

持久化消息
这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成 功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另外一个重要方面是确保持久性消
息传送至目标后,消息服务在向消费者传送它们以前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。若是消息服务因为某种缘由致使失败,它能够恢复此消息并将此消息传送至相应的消费者。虽然这样增长了消息传送的开销,但却增长了可靠性。

非持久化消息
保证这些消息最多被传送一次。对于这些消息,可靠性并不是主要的考虑因素。 此模式并不要求持久性的数据存储,也不保证消息服务因为某种缘由致使失败后消息不会丢失。 有两种方法指定传送模式:
1.使用setDeliveryMode 方法,这样全部的消息都采用此传送模式; 如:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法为每一条消息设置传送模式

相关文章
相关标签/搜索