activemq

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 1.Destination  目的地
 *  Queue 队列
 *    队列中的消息,默认只能由惟一的一个消费者处理,一旦处理消息后消息就被删除
 *
 * 支持存在多个消费者,多个生产者,因此消费者不可能消费到已经被消费的消息
 * 当消费者不存在时,消息会一直保存,直到有消费者消费
 *
 *  Topic 目的地
 *    消息会发送给全部的消费者同时处理,只有在消息能够重复处理的业务场景中可以使用
 *
 *
 */
public class TestProducer {


    public void sendTextMessage(String mess){

        ConnectionFactory connectionFactory = null;

        Connection connection = null;

        Destination destination = null;

        Session session = null;

        MessageProducer messageProducer =null;

        Message message = null;

        try {

            connectionFactory = new ActiveMQConnectionFactory();

            connection = connectionFactory.createConnection();
            //发送者默认是启动的,消费者默认是不启动的,全部客户端时必须启动
            //若是有特殊配置,配置后再启动
            connection.start();
            //1 第一个参数  是否支持事务,不推荐事务,批量时建议使用  若是true  第二个参数无效(只是提供者)
            // 客户端不支持事务
            //2  消息确认机制
            /**
             * 1 auto_acknowledge  自动确认消息,消息的消费者处理消息后,自动确认,经常使用,商业开发不推荐
             * 2 client_acknowledge 客户端手动确认,消息的消费者处理后,必须手工确认
             * 3 dups_ok_acknowledge 有副本的客户端手动确认 一个消息能够屡次处理,能够下降session的消耗,在
             *
             * 能够容忍重复消息时使用(不推荐使用)
//设置该消息的超时时间
            //messageProducer.setTimeToLive(1000);
            /**
             * 过时的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。这个队列是由ActiveMQ自动建立的。
             *
             * 若是须要查看这些未被处理的消息,能够进入这个队列中查看
               Destination destination = session.createQueue("ActiveMQ.DLQ");
             */ *
*/ session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("first-mq"); messageProducer = session.createProducer(destination); message = session.createTextMessage(mess); messageProducer.send(message);
//session.commit(); //启用事务时记得提交事务,否则消费端接收不到消息 System.out.println(
"消息已发送"); }catch (Exception e){ e.printStackTrace(); }finally { if (messageProducer!=null){ try { messageProducer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session!=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args) { TestProducer t = new TestProducer(); t.sendTextMessage("发送一条消息"); } }

 

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created by Administrator on 2019/6/27.
 */
public class TestConsumer {


    public void rTextMessage() {

        ConnectionFactory connectionFactory = null;

        Connection connection = null;

        Destination destination = null;

        Session session = null;


        MessageConsumer messageConsumer = null;

        Message message = null;

        try {

            connectionFactory = new ActiveMQConnectionFactory();

            connection = connectionFactory.createConnection();
            //发送者默认是启动的,消费者默认是不启动的,全部客户端时必须启动
            //若是有特殊配置,配置后再启动
            connection.start();
            //1 第一个参数  是否支持事务,不推荐事务,批量时建议使用  若是true  第二个参数无效(只是提供者)
            // 客户端不支持事务
            //2  消息确认机制
            /**
             * 1 auto_acknowledge  自动确认消息,消息的消费者处理消息后,自动确认,经常使用,商业开发不推荐
             * 2 client_acknowledge 客户端手动确认,消息的消费者处理后,必须手工确认
             * 3 dups_ok_acknowledge 有副本的客户端手动确认 一个消息能够屡次处理,能够下降session的消耗,在
             *
             * 能够容忍重复消息时使用(不推荐使用)
             *
             */
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            destination = session.createQueue("first-mq");
            //messageProducer = session.createProducer(destination);
            messageConsumer = session.createConsumer(destination);

            /**
             * 主动活动消息,执行一次,拉去一个消息,开发少用
             * 多个消费者要用监听
             */

            message =messageConsumer.receive();

            String mess = ((TextMessage)message).getText();

            System.out.println("接收到的消息----------"+mess);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (messageConsumer != null) {
                try {
                    messageConsumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    public static void main(String[] args) {
        TestConsumer t = new TestConsumer();
        t.rTextMessage();
    }


}

 

/**
     * 监听队列消息
     */
    public void receiveMessageListener() {
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageConsumer consumer = null;
        try {
            connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("first-quere");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收消息:"+textMessage.getText());
                        //textMessage.acknowledge();消息确认机制
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }

 

1java