源码下载:http://git.oschina.net/zhengweishan/JMS_Study_Demohtml
我使用的是ActiveMQ 5.13.3 Release的Windows版,官网最新版是ActiveMQ 5.13.4 Release,你们能够自行下载,下载地址。java
须要注意的是,开发时候,要将apache-activemq-5.13.3-bin.zip解压缩后里面的activemq-all-5.13.3.jar包加入到classpath下面,这个包包含了全部jms接口api的实现。git
项目截图:apache
JMS 公共 ----------点对点域 ----------发布/订阅域api
ConnectionFactory ---------- QueueConnectionFactory ---------- TopicConnectionFactory服务器
Connection ---------- QueueConnection ---------- TopicConnectionsession
Destination ---------- Queue ---------- Topiceclipse
Session ---------- QueueSession ---------- TopicSessionspa
MessageProducer ---------- QueueSender ---------- TopicPublisher.net
MessageConsumer ---------- QueueReceiver ---------- TopicSubscriber
(1)、点对点方式(point-to-point)
点对点的消息发送方式主要创建在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端能够在任什么时候刻发送信息到Queue,而不须要知道接收客户端是否是在运行
(2)、发布/订阅 方式(publish/subscriber Messaging)
发布/订阅方式用于多接收客户端的方式.做为发布订阅的方式,可能存在多个接收客户端,而且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他建立之后发送客户端发送的信息。做为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。
发送消息的基本步骤:
(1)、建立链接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory创建链接Connection,并启动
(3)、使用链接Connection 创建会话Session
(4)、使用会话Session和管理对象Destination建立消息生产者MessageSender
(5)、使用消息生产者MessageSender发送消息
消息接收者从JMS接受消息的步骤
(1)、建立链接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory创建链接Connection,并启动
(3)、使用链接Connection 创建会话Session
(4)、使用会话Session和管理对象Destination建立消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,须要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,须要定义onMessage事件方法。
package com.active.mq.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class MQConnectionFactory { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认链接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认链接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认链接地址 private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);//链接工厂 /** * 经过链接工厂获取链接 * [@return](http://my.oschina.net/u/556800) */ public static Connection getConnection(){ Connection connection = null; try { connection = connectionFactory.createConnection(); } catch (JMSException e) { e.printStackTrace(); } return connection; } } package com.active.mq.demo; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; public class JMSConsumer { public static void main(String[] args) { Connection connection = null;//链接 Session session = null;//会话 接受或者发送消息的线程 Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息的消费者 try { //经过链接工厂获取链接 connection = MQConnectionFactory.getConnection(); //启动链接 connection.start(); //建立session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一个链接HelloWorld的消息队列 destination = session.createQueue("HelloWorld"); //建立消息消费者 messageConsumer = session.createConsumer(destination); while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if(textMessage != null){ System.out.println("收到的消息:" + textMessage.getText()); }else { break; } } //提交回话 session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ if(connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session !=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } package com.active.mq.demo; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class JMSProducer { //发送的消息数量 private static final int SENDNUM = 10; public static void main(String[] args) { //链接 Connection connection = null; //会话 接受或者发送消息的线程 Session session = null; //消息的目的地 Destination destination; //消息生产者 MessageProducer messageProducer; try { //经过链接工厂获取链接 connection = MQConnectionFactory.getConnection(); //启动链接 connection.start(); //建立session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //建立一个名称为HelloWorld的消息队列 destination = session.createQueue("HelloWorld"); //建立消息生产者 messageProducer = session.createProducer(destination); //发送消息 sendMessage(session, messageProducer); //提交回话 session.commit(); } catch (Exception e) { e.printStackTrace(); }finally{ if(connection != null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session !=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 * [@param](http://my.oschina.net/u/2303379) session * [@param](http://my.oschina.net/u/2303379) messageProducer 消息生产者 * [@throws](http://my.oschina.net/throws) Exception */ public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ for (int i = 0; i < JMSProducer.SENDNUM; i++) { //建立一条文本消息 TextMessage message = session.createTextMessage("发送JMS消息第" + (i + 1) + "条"); System.out.println("发送消息:Activemq 发送JMS消息" + (i + 1)); //经过消息生产者发出消息 messageProducer.send(message); } } }
在获取工厂类中加入以下代码:
private static QueueConnectionFactory queueConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); /** * 经过链接工厂获取链接(Queue方式) * [@return](http://my.oschina.net/u/556800) */ public static QueueConnection getQueueConnection(){ QueueConnection connection = null; try { connection = queueConnectionFactory.createQueueConnection(); } catch (JMSException e) { e.printStackTrace(); } return connection; } //消息生产者 package com.active.mq.demo; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; public class QueueProducer { private static final int SEND_NUM = 10; public static void main(String[] args) { QueueConnection queueConnection = null; QueueSession queueSession = null; try { // 经过工厂建立一个链接 queueConnection = MQConnectionFactory.getQueueConnection(); // 启动链接 queueConnection.start(); // 建立一个session会话 queueSession = queueConnection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一个消息队列 Queue queue = queueSession.createQueue("QueueMsgDemo"); // 建立消息发送者 QueueSender sender = queueSession.createSender(queue); // 设置持久化模式 sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(queueSession, sender); // 提交会话 queueSession.commit(); } catch (Exception e) { e.printStackTrace(); }finally { // 关闭释放资源 if (queueSession != null) { try { queueSession.close(); } catch (JMSException e) { e.printStackTrace(); } } if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void sendMessage(QueueSession session, QueueSender sender) throws Exception { for (int i = 0; i < SEND_NUM; i++) { String message = "发送queue消息第" + (i + 1) + "条"; //建立一个Map集合信息 MapMessage map = session.createMapMessage(); map.setString("text", message); map.setLong("time", System.currentTimeMillis()); System.out.println("ActiveMQ 发送queue消息:"+(i + 1)); sender.send(map); } } } //消费者 package com.active.mq.demo; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; public class QueueConsumer { public static void main(String[] args) { QueueConnection queueConnection = null; QueueSession queueSession = null; try { // 经过工厂建立一个链接 queueConnection = MQConnectionFactory.getQueueConnection(); // 启动链接 queueConnection.start(); // 建立一个session会话 queueSession = queueConnection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一个消息队列 Queue queue = queueSession.createQueue("QueueMsgDemo"); // 建立消息接收者 QueueReceiver receiver = queueSession.createReceiver(queue); receiver.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { System.out.println(map.getLong("time") + "接收到消息#" + map.getString("text")); } catch (JMSException e) { e.printStackTrace(); } } } }); // 休眠100ms再关闭 Thread.sleep(1000 * 100); // 提交会话 queueSession.commit(); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭释放资源 if (queueSession != null) { try { queueSession.close(); } catch (JMSException e) { e.printStackTrace(); } } if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
在获取工厂类中加入以下代码:
private static TopicConnectionFactory topicConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); /** * 经过链接工厂获取链接(Topic方式) * @return */ public static TopicConnection getTopicConnection(){ TopicConnection topicConnection = null; try { topicConnection = topicConnectionFactory.createTopicConnection(); } catch (JMSException e) { e.printStackTrace(); } return topicConnection; } //生产者 package com.active.mq.demo; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; public class TopicProducer { private static final int SEND_NUM = 10; public static void main(String[] args) { TopicConnection connection = null; TopicSession session = null; try { // 经过工厂建立一个链接 connection = MQConnectionFactory.getTopicConnection(); // 启动链接 connection.start(); // 建立一个session会话 session = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE); // 建立一个消息队列 Topic topic = session.createTopic("TopicDemo"); // 建立消息发送者 TopicPublisher publisher = session.createPublisher(topic); // 设置持久化模式 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, publisher); // 提交会话 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭释放资源 if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception { for (int i = 0; i < SEND_NUM; i++) { String message = "发送Topic消息第" + (i + 1) + "条"; MapMessage map = session.createMapMessage(); map.setString("text", message); map.setLong("time", System.currentTimeMillis()); System.out.println("ActiveMQ 发送Topic消息:"+(i + 1)); publisher.send(map); } } } //消费者 package com.active.mq.demo; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; public class TopicConsumer { public static void main(String[] args) { TopicConnection connection = null; TopicSession session = null; try { // 经过工厂建立一个链接 connection = MQConnectionFactory.getTopicConnection(); // 启动链接 connection.start(); // 建立一个session会话 session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一个消息队列 Topic topic = session.createTopic("TopicDemo"); // 建立消息消费者 TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { System.out.println(map.getLong("time") + "Topic接收消息#" + map.getString("text")); } catch (JMSException e) { e.printStackTrace(); } } } }); // 休眠100ms再关闭 Thread.sleep(1000 * 100); // 提交会话 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭释放资源 if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
以使用JMS方式发送接收消息为例说明 一、首先,启动ActiveMQ 二、运行发送者,eclipse控制台输出,以下图:
三、查看ActiveMQ服务器,Queues内容以下:
咱们能够看到建立了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费,咱们也能够经过Browse查看是哪些消息,若是这些队列中的消息,被删除,消费者则没法消费。
四、运行一下消费者,eclipse控制台打印消息,以下:
五、咱们在查看一下ActiveMQ服务器,Queues内容以下:
咱们能够看到HelloWorld的消息队列发生变化,多一个消息者,队列中的10条消息被消费了,点击Browse查看,已经为空了。 点击Active Consumers,咱们能够看到这个消费者的详细信息。
实例到此就结束了,你们能够本身多看点ActiveMQ服务器的内容,进一步熟悉ActiveMQ。