在前面一篇文章里讨论过几种应用系统集成的方式,发现实际上面向消息队列的集成方案算是一个整体比较合理的选择。这里,咱们先针对具体的一个消息队列Activemq的基本通讯方式进行探讨。activemq是JMS消息通讯规范的一个实现。总的来讲,消息规范里面定义最多见的几种消息通讯模式主要有发布-订阅、点对点这两种。另外,经过结合这些模式的具体应用,咱们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,咱们针对这几种方式一一讨论一下。java
在讨论具体方式的时候,咱们先看看使用activemq须要启动服务的主要过程。apache
按照JMS的规范,咱们首先须要得到一个JMS connection factory.,经过这个connection factory来建立connection.session
在这个基础之上咱们再建立session, destination, producer和consumer。所以主要的几个步骤以下:tcp
1. 得到JMS connection factory. 经过咱们提供特定环境的链接信息来构造factory。ide
2. 利用factory构造JMS connection函数
3. 启动connection优化
4. 经过connection建立JMS session.spa
5. 指定JMS destination.线程
6. 建立JMS producer或者建立JMS message并提供destination.code
7. 建立JMS consumer或注册JMS message listener.
8. 发送和接收JMS message.
9. 关闭全部JMS资源,包括connection, session, producer, consumer等。
发布订阅模式有点相似于咱们平常生活中订阅报纸。每一年到年尾的时候,邮局就会发一本报纸集合让咱们来选择订阅哪个。
在这个表里头列了全部出版发行的报纸,那么对于咱们每个订阅者来讲,咱们能够选择一份或者多份报纸。好比北京日报、
潇湘晨报等。那么这些个咱们订阅的报纸,就至关于发布订阅模式里的topic。有不少我的订阅报纸,也有人可能和我订阅了相同的
报纸。那么,在这里,至关于咱们在同一个topic里注册了。对于一份报纸发行方来讲,它和全部的订阅者就构成了一个1对多的关系。
这种关系以下图所示:
如今,假定咱们用前面讨论的场景来写一个简单的示例。咱们首先须要定义的是publisher.
publisher
publisher是属于发布信息的一方,它经过定义一个或者多个topic,而后给这些topic发送消息。
publisher的构造函数以下:
public class JMSPublisher { private static final String USER=ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; private static final int SENDNUM=10; public static void main(String[] args) throws JMSException { ConnectionFactory factory; Connection conn; Session session; Destination des; MessageProducer producer; //一、获取连接工厂 factory = new org.apache.activemq.ActiveMQConnectionFactory(JMSPublisher.USER, JMSPublisher.PASSWORD, JMSPublisher.BROKEURL); //二、获取连接 conn = factory.createConnection(); //三、开启连接 conn.start(); //四、获取会话 session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE); //五、建立队列 des=session.createTopic("topic1"); //六、建立消息生产者 producer=session.createProducer(des); //七、发送消息 for(int i=0;i<JMSPublisher.SENDNUM;i++){ TextMessage message=session.createTextMessage("生产者生产消息: 第"+i+"条=========="); System.out.println("生产者生产消息: 第"+i+"条=========="); producer.send(message); } //八、提交 session.commit(); } }
Consumer1
Consumer的代码也很相似,具体的步骤无非就是1.初始化资源。 2. 接收消息。 3. 必要的时候关闭资源。
public class JMSConsumer { private static final String USER=ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; private static final int SENDNUM=10; public static void main(String[] args) throws JMSException { ConnectionFactory factory; Connection conn; Session session; Destination des; MessageConsumer consumer; //一、实例化链接工厂 factory=new ActiveMQConnectionFactory(JMSConsumer.USER, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); //二、获取连接 conn=factory.createConnection(); //三、开启连接 conn.start(); //四、建立会话 session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE); //五、消息目的地 des=session.createTopic("topic1"); //六、建立消息消费者 consumer = session.createConsumer(des); //七、获取消息 consumer.setMessageListener(new MyTopicListener1()); } }
Consumer2
public class JMSConsumer2 { private static final String USER=ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) throws JMSException { ConnectionFactory connFactory; Connection conn; Session session; Destination des; MessageConsumer consumer; //一、链接工厂 connFactory = new ActiveMQConnectionFactory(JMSConsumer2.USER, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL); //二、链接 conn = connFactory.createConnection(); //三、开启链接 conn.start(); //四、会话 session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); //五、目的 des = session.createTopic("topic_2"); //六、消费者 consumer = session.createConsumer(des); //七、监听 consumer.setMessageListener(new MyMessageListener2()); } }
在发布订阅模式中,消费者要主动监听生产者的生产消息
MessageListener1:
Listener对象的职责很简单,主要就是处理接收到的消息:
public class MyMessageListener implements MessageListener{ public void onMessage(Message message) { if(message!=null){ try { System.out.println("111111111111消费者 接收消息 : "+((TextMessage)message).getText()+"<<<<============"); } catch (JMSException e) { e.printStackTrace(); } } } }
MessageListener2:
public class MyMessageListener2 implements MessageListener{ public void onMessage(Message message) { if(message!=null){ try { System.out.println("22222222222消费者 接收消息 : "+((TextMessage)message).getText()+"<<<<============"); } catch (JMSException e) { e.printStackTrace(); } } } }
结果:
它实现了MessageListener接口,里面的onMessage方法就是在接收到消息以后会被调用的方法。
如今,经过实现前面的publisher和consumer咱们已经实现了pub-sub模式的一个实例。仔细回想它的步骤的话,
主要就是要二者设定一个共同的topic,有了这个topic以后他们能够实现一方发消息另一方接收。另外,为了链接到具体的
message server,这里是使用了链接tcp://localhost:16161做为定义ActiveMQConnectionFactory的路径。在publisher端经过
session建立producer,根据指定的参数建立destination,而后将消息和destination做为producer.send()方法的参数发消息。
在consumer端也要建立相似的connection,session。经过session获得destination,再经过session.createConsumer(destination)
来获得一个MessageConsumer对象。有了这个MessageConsumer咱们就能够自行选择是直接同步的receive消息仍是注册listener了。
p2p的过程则理解起来更加简单。它比如是两我的打电话,这两我的是独享这一条通讯链路的。一方发送消息,另一方接收,
就这么简单。在实际应用中由于有多个用户对使用p2p的链路,它的通讯场景以下图所示:
咱们再来看看一个p2p的示例:
在p2p的场景里,相互通讯的双方是经过一个相似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者
和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。
Producer:
public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认链接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认链接用户名 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认链接地址 private static final int SENDNUM = 10;//发送消息数量 public static void main(String[] args) throws Exception { ConnectionFactory connFactory;//链接工厂 Connection conn;//链接 Session session;//会话 接受或者发送消息的线程 Destination destination;//消息目的地 MessageProducer producer;//消息生产者 //一、实例化链接工厂 connFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); //二、获取链接 conn = connFactory.createConnection(); //三、启动链接 conn.start(); //四、建立会话--是否开启事务--消息确认方式 session = conn.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE ); //五、建立消息队列 destination = session.createQueue("FirstQueue1"); //六、建立消息生产者 producer = session.createProducer(destination); //七、发送消息 for(int i=0;i<JMSProducer.SENDNUM;i++){ TextMessage textMessage = session.createTextMessage("ActiveMQ 发送消息:"+i); System.out.println("生产者生产消息:"+textMessage.getText()); producer.send(textMessage); } session.commit(); if(conn!=null){ conn.close(); } } }
Consumer:
public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认链接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认链接用户名 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认链接地址 public static void main(String[] args) throws Exception { ConnectionFactory connFactory;//链接工厂 Connection conn;//链接 Session session;//会话 接受或者发送消息的线程 Destination destination;//消息目的地 MessageConsumer consumer;//消息生产者 //一、实例化链接工厂 connFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); //二、获取链接 conn = connFactory.createConnection(); //三、启动链接 conn.start(); //四、建立会话--是否开启事务--消息确认方式 session = conn.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE ); //五、建立消息队列 destination = session.createQueue("FirstQueue1"); //六、建立消费者 consumer = session.createConsumer(destination); //七、注册消息监听 consumer.setMessageListener(new MyMessageListener()); } }
Listener:
public class MyMessageListener implements javax.jms.MessageListener { public void onMessage(Message message) { if(message!=null){ try { System.out.println("消费者监听消息:---->"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
这里代码和前面pub-sub的具体实现代码很是类似,就再也不赘述。
如今若是咱们比较一下pub-sub和p2p模式的具体实现步骤的话,咱们会发现他们基本的处理流程都是相似的,
除了在pub-sub中要经过createTopic来设置topic,而在p2p中要经过createQueue来建立通讯队列。他们之间存在着不少的重复之处,
在具体的开发过程当中,咱们是否能够进行一些工程上的优化呢?别急,后面咱们会讨论到的。
回顾前面三种基本的通讯方式,咱们会发现,他们都存在着必定的共同点,好比说都要初始化ConnectionFactory, Connection,
Session等。在使用完以后都要将这些资源关闭。若是每个实现它的通讯端都这么写一通的话,实际上是一种简单的重复。
从工程的角度来看是彻底没有必要的。那么,咱们有什么办法能够减小这种重复呢?
一种简单的方式就是经过工厂方法封装这些对象的建立和销毁,而后简单的经过调用工厂方法的方式获得他们。另外,
既然基本的流程都是在开头建立资源在结尾销毁,咱们也能够采用Template Method模式的思路。经过继承一个抽象类,
在抽象类里提供了资源的封装。全部继承的类只要实现怎么去使用这些资源的方法就能够了。Spring中间的JMSTemplate就提供了
这种相似思想的封装。
在上诉的代码中几个常量须要写一下:
一、
ActiveMQConnection.DEFAULT_USER 表示默认链接用户名
ActiveMQConnection.DEFAULT_PASSWORD;表示默认链接用户名
ActiveMQConnection.DEFAULT_BROKER_URL;表示默认地址
二、
Session.AUTO_ACKNOWLEDGE
当客户端从 receive 或 onMessage成功返回时,Session 自动签收客户端的这条消息的收条。
Session.CLIENT_ACKNOWLEDGE
客户端经过调用消息的 acknowledge 方法签收消息。message.acknowledge();
客户经过消息的acknowledge 方法确认消
息。须要注意的是,在这种模式中,确认是在会话层上进行:确认一个被
消费的消息将自动确认全部已被会话消费的消息。例如,若是一个消息消
费者消费了10 个消息,而后确认第5 个消息,那么全部10 个消息都被确
认。
Session.DUPS_ACKNOWLEDGE。
该选择只是会话迟钝的确认消息的提交。如
果JMS provider 失败,那么可能会致使一些重复的消息。若是是重复的
消息,那么JMS provider 必须把消息头的JMSRedelivered 字段设置为
true。