测试:
该关闭的配置都先关掉
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; import javax.jms.*; import java.io.File; public class BlogMsgSend { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://192.168.25.128:61616?jms.blobTransferPolicy.uploadUrl=http://192.168.25.128:8161/fileserver/"); // "tcp://192.168.25.128:61616?jms.blobTransferPolicy.defaultUploadUrl=http://192.168.25.128:8161/fileserver/"); Connection connection = connectionFactory.createConnection(); connection.start(); ActiveMQSession session = (ActiveMQSession) connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); BlobMessage blobMsg = session.createBlobMessage(new File("pom.xml")); producer.send(blobMsg); session.commit(); session.close(); connection.close(); } }
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.BlobMessage; import javax.jms.*; import java.io.InputStream; public class BlogMsgReceiver { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { if (msg instanceof BlobMessage) { BlobMessage message = (BlobMessage) msg; try { InputStream in = message.getInputStream(); byte[] buf = new byte[in.available()]; in.read(buf); in.close(); System.out.println("content==" + new String(buf)); } catch (Exception e) { e.printStackTrace(); } } } }); } }
先运行生产者,后运行消费者。
测试:
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageProducer; import org.apache.activemq.MessageTransformer; import javax.jms.*; public class QueueSender { public static void main(String[] args) throws JMSException, InterruptedException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message---" + i); producer.setTransformer(new MessageTransformer() { @Override public Message producerTransform(Session session, MessageProducer producer, Message msg) throws JMSException { //把TextMessage转成MapMessage MapMessage mapMessage = session.createMapMessage(); String value = ((TextMessage) msg).getText(); mapMessage.setString(value, "my map message AAA==" + value); mapMessage.setStringProperty("extra", "okok"); return mapMessage; } @Override public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException { return null; } }); producer.send(message); } session.commit(); session.close(); connection.close(); } }
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueReceiver { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); int i = 0; while (i < 3) { MapMessage message = (MapMessage) consumer.receive(); System.out.println("收到的消息:" + message.getString("message---" + i) + ", property==" + message.getStringProperty("extra")); i++; session.commit(); } session.close(); connection.close(); } }