本人博客开始迁移,博客整个架构本身搭建及编码 http://www.cookqq.com/listBlog.action java
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。下面详细的解释经常使用类的做用 apache
ConnectionFactory 接口(链接工厂) 用户用来建立到JMS提供者的链接的被管对象。JMS客户经过可移植的接口访问链接,这样当下层的实现改变时,代码不须要进行修改。 管理员在JNDI名字空间中配置链接工厂,这样,JMS客户才可以查找到它们。根据消息类型的不一样,用户将使用队列链接工厂,或者主题链接工厂。
Connection 接口(链接) 链接表明了应用程序和消息服务器之间的通讯链路。在得到了链接工厂后,就能够建立一个与JMS提供者的链接。根据不一样的链接类型,链接容许用户建立会话,以发送和接收队列和主题到目标。
Destination 接口(目标) 目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员建立这些对象,而后用户经过JNDI发现它们。和链接工厂同样,管理员能够建立两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
MessageConsumer 接口(消息消费者) 由会话建立的对象,用于接收发送到目标的消息。消费者能够同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
MessageProducer 接口(消息生产者) 由会话建立的对象,用于发送消息到目标。用户能够建立某个目标的发送者,也能够建立一个通用的发送者,在发送消息时指定目标。
Message 接口(消息) 是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另外一个应用程序。一个消息有三个主要部分: 消息头(必须):包含用于识别和为消息寻找路由的操做设置。 一组消息属性(可选):包含额外的属性,支持其余提供者和用户的兼容。能够建立定制的字段和过滤器(消息选择器)。 一个消息体(可选):容许用户建立五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。 消息接口很是灵活,并提供了许多方式来定制消息的内容。
Session 接口(会话) 表示一个单线程的上下文,用于发送和接收消息。因为会话是单线程的,因此消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。若是用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务以前,用户能够使用回滚操做取消这些消息。一个会话容许用户建立消息生产者来发送消息,建立消息消费者来接收消息。
JMS的消息模式有1.点对点的消息模式(Point to Point Messaging) 服务器
2.发布订阅模式(publish – subscribe Mode) session
这里基于点对点的消息模式进行ActiveMQ发消息和收消息过程的分析,请看模型图: 架构
点对点的消息发送方式主要创建在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder(客户端A) 发送消息,receive(客户端B)接收消息。具体点就是客户端A发送Message Queue ,而 客户端B从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端能够在 任什么时候刻发送信息到Queue,而不须要知道接收客户端是否是在运行 。异步
请看下面发消息和收消息的例子jsp
package com.activemq.queue; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMqTest { private static String queueName = "activemq_queue_"; public static void main(String[] args) { Receiver receiver=new Receiver(); Sender sender =new Sender(); try { sender.send(); receiver.receive(); } catch (Exception e) { e.printStackTrace(); } } static class Receiver { public static void receive() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageConsumer consumer = session.createConsumer(destination); //第一种状况 int i = 0; while (i < 3) { i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); // TODO something.... System.out .println("收到消息:" +message.getText()); } session.close(); connection.close(); //----------------第一种状况结束---------------------- //第二种方式 // consumer.setMessageListener(new MessageListener() { // public void onMessage(Message arg0) { // if(arg0 instanceof TextMessage){ // try { // System.out.println("arg0="+((TextMessage)arg0).getText()); // } catch (JMSException e) { // e.printStackTrace(); // } // } // } // }); //第三种状况 // while (true) { // Message msg = consumer.receive(1000); // TextMessage message = (TextMessage) msg; // if (null != message) { // System.out.println("收到消息:" + message.getText()); // } // } } } static class Sender { public static void send() throws Exception { ConnectionFactory connectionFactory = null; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, //null ActiveMQConnection.DEFAULT_PASSWORD, //null "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("count"+new Date().getTime()); Thread.sleep(1000); // 经过消息生产者发出消息 System.out.println("发送消息"+i+new Date()); producer.send(message); } session.commit(); session.close(); connection.close(); } } }
Sender主要的做用是发送消息,Receiver主要的做用是接受消息,而且显示一下接收消息的内容,这里详细的解释接受消息的方法:tcp
(1)第一种方法使用consumer.receive() 或 consumer.receive(int timeout)接受消息,消息的接收者会一直等待下去,直到有消息到达,或者超时。 ide
其实第一种方法和第三种方法接受原理同样,区别是第一种知道要接受消息的条数,接受完消息,手动关系链接。而第三种不知道要接受多少条数据,因此使用while (true) 死循环直接在接受消息。 编码
(2)第二种方法:消息消费者注册一个MessageListener,当有消息到达的时候,会回调它的onMessage()方法。
这里须要注意的是,你注册完成MessageListener,千万不要关闭链接session.close();和connection.close();由于你刚刚注册完成监听器,就把链接关闭,就不会受到消息,因此监听器中也不会有处理。(这个问题可把我整哭了,搞了半天,才弄明白)
请看ActiveMQ 页面上显示队列的信息
name是队列名称
Number Of Pending Messages 是队列中有多少个消息等待出队列
Number Of Consumers 是队列中有多少个消费者
Messages Enqueued 是队列共有多少个信息
Messages Dequeued 是队列中已经出列多少个消息
开发中遇到的异常:
(1)javax.jms.JMSException: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect
拒绝链接,缘由是activemq服务器没有开启。
解决办方法:开启activemq服务器,请参照《activemq跑起来》