ActiveMQ(三十二)--Message高级特性2

一、Blob Messages

测试:

该关闭的配置都先关掉

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

import javax.jms.*;
import java.io.File;

public class BlogMsgSend {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.25.128:61616?jms.blobTransferPolicy.uploadUrl=http://192.168.25.128:8161/fileserver/");
//                "tcp://192.168.25.128:61616?jms.blobTransferPolicy.defaultUploadUrl=http://192.168.25.128:8161/fileserver/");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        ActiveMQSession session = (ActiveMQSession) connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("my-queue");

        MessageProducer producer = session.createProducer(destination);

        BlobMessage blobMsg = session.createBlobMessage(new File("pom.xml"));
        producer.send(blobMsg);

        session.commit();
        session.close();
        connection.close();
    }
}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;

import javax.jms.*;
import java.io.InputStream;

public class BlogMsgReceiver {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("my-queue");
        MessageConsumer consumer = session.createConsumer(destination);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message msg) {
                if (msg instanceof BlobMessage) {
                    BlobMessage message = (BlobMessage) msg;
                    try {
                        InputStream in = message.getInputStream();
                        byte[] buf = new byte[in.available()];
                        in.read(buf);
                        in.close();
                        System.out.println("content==" + new String(buf));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }
}

先运行生产者,后运行消费者。

二、Message Transformation

测试:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.MessageTransformer;

import javax.jms.*;

public class QueueSender {

    public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("my-queue");

        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);

        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("message---" + i);
            producer.setTransformer(new MessageTransformer() {
                @Override
                public Message producerTransform(Session session, MessageProducer producer, Message msg) throws JMSException {
                    //把TextMessage转成MapMessage
                    MapMessage mapMessage = session.createMapMessage();
                    String value = ((TextMessage) msg).getText();
                    mapMessage.setString(value, "my map message AAA==" + value);
                    mapMessage.setStringProperty("extra", "okok");
                    return mapMessage;
                }

                @Override
                public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException {
                    return null;
                }
            });
            producer.send(message);

        }

        session.commit();
        session.close();
        connection.close();
    }
}
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueReceiver {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("my-queue");
        MessageConsumer consumer = session.createConsumer(destination);
        int i = 0;
        while (i < 3) {
            MapMessage message = (MapMessage) consumer.receive();
            System.out.println("收到的消息:" + message.getString("message---" + i) +
                    ", property==" + message.getStringProperty("extra"));
            i++;
            session.commit();
        }
        session.close();
        connection.close();
    }
}