ActiveMQ基础学习

参考资料

ActiveMQ官网java

ActiveMQ简介

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。apache

JMS关键接口

ConnectionFactory:用于建立链接到消息中间件的链接工厂。
Connection:表明了应用程序和服务之间的链接通路。
Destination:指消息发布的地点,包括队列模式和主题模式。
Session:表示一个单线程的上下文,用于发送和接受消息。
MessageConsumer:由会话建立,用于接受发送到目的的消息。
MessageProducer:由会话建立,用于发送消息。
Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,和一个消息体。session

ActivMQ依赖

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
</dependency>

队列消息

发送队列模型消息tcp

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class MessageProducer {
    //定义ActivMQ的链接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定义发送消息的队列名称
    private static final String QUEUE_NAME = "MyMessage";

    public static void main(String[] args) throws JMSException {
        //建立链接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
       //建立链接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打开链接
        connection.start();
        //建立会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立队列目标
        Destination destination = session.createQueue(QUEUE_NAME);
        //建立一个生产者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //建立模拟100个消息
        for (int i = 1 ; i <= 100 ; i++){
            TextMessage message = session.createTextMessage("我发送message:" + i);
            //发送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我如今发的消息是:" + message.getText());
        }
        //关闭链接
        connection.close();
    }
}

接收队列模型消息ide

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class MessageConsumer {
    //定义ActivMQ的链接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定义发送消息的队列名称
    private static final String QUEUE_NAME = "MyMessage";
    public static void main(String[] args) throws JMSException {
        //建立链接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立链接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打开链接
        connection.start();
        //建立会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立队列目标
        Destination destination = session.createQueue(QUEUE_NAME);
        //建立消费者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //建立消费的监听
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("获取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

注:队列模型的消息,只会被一个消费者所消费,而不会被共享。线程

主题消息

发送主题模型消息code

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class MessageTopicProducer {
    //定义ActivMQ的链接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定义发送消息的主题名称
    private static final String TOPIC_NAME = "MyTopicMessage";

    public static void main(String[] args) throws JMSException {
        //建立链接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立链接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打开链接
        connection.start();
        //建立会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立队列目标
        Destination destination = session.createTopic(TOPIC_NAME);
        //建立一个生产者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //建立模拟100个消息
        for (int i = 1; i <= 100; i++) {
            TextMessage message = session.createTextMessage("当前message是(主题模型):" + i);
            //发送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我如今发的消息是:" + message.getText());
        }
        //关闭链接
        connection.close();
    }
}

接收主题模型消息component

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class MessageTopicConsumer {
    //定义ActivMQ的链接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定义发送消息的队列名称
    private static final String TOPIC_NAME = "MyTopicMessage";
    public static void main(String[] args) throws JMSException {
        //建立链接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //建立链接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打开链接
        connection.start();
        //建立会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立队列目标
        Destination destination = session.createTopic(TOPIC_NAME);
        //建立消费者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //建立消费的监听
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("获取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

注:消费者要先订阅主题,这样生产者发送的主题模型消息才能被消费者消费。xml

相关文章
相关标签/搜索