在咱们平常的项目开发过程当中,通常各模块或者函数方法之间,都是采用链式调用的方式,为了完成一个总体功能,咱们会将其拆分红多个函数(或者子模块),好比模块A调用模块B,模块B调用模块C,模块C调用模块D。但在大型分布式应用中,系统间的RPC(远程过程调用(Remote Procedure Call)的缩写形式) 交互繁杂,一个功能背后要调用上百个接口并不是不可能,这种架构就有以下几个劣势:前端
根据上述的几个问题,那咱们在设计系统时须要明确一下要达到的目标:java
在如今的系统视线中,MQ消息队列是广泛使用的,能够完美的解决上面提到的问题。下图是使用了MQ的简单架构图,能够看到MQ在最前端对流量进行蓄洪,下游的系统ABC只与MQ打交道,经过事先定义好的消息格式来解析。
引入MQ以后的系统架构、交互方式与最初的链式调用架构很是不一样,虽然能够解决上文提到的问题,但也要充分理解其原理特性来避免其带来的反作用,这里以消息队列如何保证“消息的可靠投递”为切入点,来看看MQ的实现方式。linux
大致上的流程就是如此,可是暂时不展开具体的描述,由于仅有MQ这种实现方式的思想还不够,因为系统模块间存在着异步的交互,因此,咱们不得不引入一个关于异步交互的知识点——JMS。web
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。apache
JMS容许应用程序组件基于JavaEE平台建立、发送、接收和读取消息。它使分布式通讯耦合度更低,消息服务更加可靠以及异步性。windows
提到JMS,就会引出它内部的一些组件或者说对象,那就避免不了有一些术语,作一下解释:后端
那么接下来,就先详细的介绍一下,JMS的相关内容,在介绍完JMS后,咱们再来研究对JMS这个接口具体实现的提供者,好比:Apache ActiveMQ
可能会有同窗存在疑问,为何要介绍JMS呢,其实JMS只是一套Java的标准,也就是接口,它没有具体的实现类,若是咱们想要使用,用到确定不是接口,而是具体实现。可是咱们在使用具体的JMS提供者前,先搞清楚这个提供者到底实现了哪些东西,这样再去学习具体的实现产品,就垂手可得了。服务器
JMS是Java的消息服务,JMS的客户端之间能够经过JMS服务进行异步的消息传输。网络
JMS由如下元素组成。session
JMS提供者
链接面向消息中间件的,JMS接口的一个实现。提供者能够是Java平台的JMS实现,也能够是非Java平台的面向消息中间件的适配器。
JMS客户
生产或消费消息的基于Java的应用程序或对象。
JMS生产者
建立并发送消息的JMS客户。
JMS消费者
接收消息的JMS客户。
JMS消息
包括能够在JMS客户之间传递的数据的对象
JMS队列
一个容纳那些被发送的等待阅读的消息的区域。队列暗示,这些消息将按照顺序发送。一旦一个消息被阅读,该消息将被从队列中移走。
JMS主题
一种支持发送消息给多个订阅者的机制。
Point-to-Point(P2P)
Publish/Subscribe(Pub/Sub)
若是你但愿发送的每一个消息都应该被成功处理的话,那么你须要P2P模式。
客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
若是你但愿发送的消息能够不被作任何处理、或者被一个消息者处理、或者能够被多个消费者处理的话,那么能够采用Pub/Sub模型。
在JMS中,消息的产生和消息是异步的。对于消费来讲,JMS的消息者能够经过两种方式来消费消息。
ConnectionFactory 接口(链接工厂)
建立Connection对象的工厂,根据消息类型的不一样,用户将使用队列链接工厂或者主题链接工厂。
分别有QueueConnectionFactory和TopicConnectionFactory两种。能够经过JNDI来查找ConnectionFactory对象。
Destination 接口(目标)
Destination是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。是消息生产者的消息发送目标或者说消息消费者的消息来源。
对于消息生产者来讲,它的Destination是某个队列(Queue)或某个主题(Topic);
对于消息消费者来讲,它的Destination也是某个队列或主题(即消息来源)。
因此,Destination实际上就是两种类型的对象:Queue、Topic能够经过JNDI来查找Destination。
Connection 接口(链接)
Connection表示在客户端和JMS系统之间创建的连接(对TCP/IP socket的包装)。
Connection能够产生一个或多个Session。跟ConnectionFactory同样,Connection也有两种类型:QueueConnection和TopicConnection。
Session 接口(会话)
Session是咱们操做消息的接口。表示一个单线程的上下文,用于发送和接收消息。
因为会话是单线程的,因此消息是连续的,就是说消息是按照发送的顺序一个一个接收的。
能够经过session建立生产者、消费者、消息等。Session提供了事务的功能。当咱们须要使用session发送/接收多个消息时,能够将这些发送/接收动做放到一个事务中。
一样,也分QueueSession和TopicSession。
MessageProducer 接口(消息的生产者)
消息生产者由Session建立,并用于将消息发送到Destination。消费者能够同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
一样,消息生产者分两种类型:QueueSender和TopicPublisher。能够调用消息生产者的方法(send或publish方法)发送消息。
MessageConsumer 接口(消息消费者)
消息消费者由Session建立,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。
可分别经过session的createReceiver(Queue)或createSubscriber(Topic)来建立。
固然,也能够session的creatDurableSubscriber方法来建立持久化的订阅者。
Message 接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另外一个应用程序。一个消息有三个主要部分:
一、消息头(必须):包含用于识别和为消息寻找路由的操做设置。
二、一组消息属性(可选):包含额外的属性,支持其余提供者和用户的兼容。能够建立定制的字段和过滤器(消息选择器)。
三、一个消息体(可选):容许用户建立五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。消息接口很是灵活,并提供了许多方式来定制消息的内容。
消息接口很是灵活,并提供了许多方式来定制消息的内容。
MessageListener
消息监听器。若是注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
经过上面的介绍,若是要使用Java消息服务,咱们就必需要有一个JMS提供者,来管理会话和队列。如今既有开源的提供者也有专有的提供者。
开源的提供者包括:Apache ActiveMQ、Kafka、WebMethods、阿里的RocketMQ等。Kafka和RocketMQ已经贡献给了Apache项目基金会,目前是属于Apache的了。
如今,就来介绍一下,ActiceMQ,它是Apache开源的消息服务器;
官方网站:activemq.apache.org
windows用户在activemq安装目录的bin/win64/activemq.bat
mac\linux:activemq安装目录的bin ./activemq start
访问地址:localhost:8161/admin
用户名/密码:admin/admin
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency>
以名为Hello的项目为例子,简单用代码实现一下整个过程,主要是生产者发送一条Hello ActiveMQ 消息到消息中间件,而后消费者经过访问消息中间件,得到这条消息。
package com.golden3young.p2p.hello; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class HelloProducer { public static void main(String[] args) throws JMSException { //1.建立链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616"); //2.建立Connection 并调用它的start() Connection connection = factory.createConnection(); connection.start(); //3.经过Connection建立session //方法参数(是否开启事务,消息签收方式:自动签收/手动签收) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.经过Session建立Destination(Queue/Topic) //方法参数(队列名称) Queue queue = session.createQueue("hello"); //5.经过Session建立生产者 MessageProducer producer = session.createProducer(queue); //6.经过Sessioin建立消息 TextMessage msg = session.createTextMessage("Hello ActiveMQ"); //7.使用生产者对象发送消息 producer.send(msg); //8.把资源关闭 producer.close(); session.close(); connection.close(); } }
package com.golden3young.p2p.hello; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class HelloConsumer { public static void main(String[] args) throws JMSException { //建立ConnectionFactory ConnectionFactory factory = new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616"); //建立Connection 并 调用 start()方法 Connection connection = factory.createConnection(); connection.start(); //建立Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立Destination - Quene //两个队列的名字必须彻底一致 Queue queue = session.createQueue("hello"); //建立消费者MessageConsumer MessageConsumer consumer = session.createConsumer(queue); //使用消费者接收消息 TextMessage msg = (TextMessage)consumer.receive(); System.out.println(msg); } }
若是是以监听器的方式来获取消息,那么须要引用或者建立。下面是引用的代码:
既然是引用,就首先得建立一个监听器,代码:
package com.etoak.golden3young.hello; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class HelloListener implements MessageListener { @Override public void onMessage(Message message) { if(message instanceof TextMessage) { TextMessage text = (TextMessage) message; try { System.out.println(text.getText().toString()); } catch (JMSException e) { e.printStackTrace(); } } } }
建立完成后,在消费者类中,进行引用:
// 设置消息监听器 只要队列中生产者生产了一条消息,当前消费者就会监听到是不是本身想要的类型,若是是,就打印。 consumer.setMessageListener(new HelloListener());
package com.golden3young.topic; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicProducer { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立Destination Topic topic = session.createTopic("topic"); //事实上应该是 Publisher发布者, 可是API中仍是叫producer MessageProducer producer = session.createProducer(topic); TextMessage msg = session.createTextMessage("Hello I am Topic ActiveMQ!"); producer.send(msg); producer.close(); session.close(); connection.close(); } }
package com.golden3young.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicConsumer { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("topic"); // 事实上应该是 subscriber 订阅者,可是API里仍是叫consumer MessageConsumer consumer = session.createConsumer(topic); //直接在本类中建立了一个监听器,而没再单独引用。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage text = (TextMessage) message; try { System.out.println(text.getText().toString()); } catch (JMSException e) { e.printStackTrace(); } } }); } }