消息中间件和JMS介绍

 

在一个公司创立初期,他可能只有几个应用,系统之间的关联也不是那么大,A系统调用B系统就直接调用B提供的API接口;后来这个公司作大了,他一步步发展有了几十个系统,这时候A系统要调用B系统的接口,可是B系统前几天刚改了一下接口A并不知情。因此A发现调不通因而给B系统管理员打电话,小王啊,改了接口咋不告诉我呢。我还觉得咱们系统出错了呢。弄得小王一顿尴尬,我这本身改个东西还的通知这个通知那个的。java

1 中间件介绍

咱们看到上面的故事中的小王他真的是很累啊。本身修改一个接口还的给全部调用接口的系统管理员打电话告知API发生变化。说到这个问题啊,仍是的说咱们系统之间的耦合。对于一个小公司来讲是无所谓,可是对于一个大公司这种状况简直是致命的。因而最近几年这些愈来愈大的互联网公司在这种挑战下提出了中间件这个概念:中间件在操做系统软件,网络和数据库之上,应用软件之下,总的做用是为处于本身上层的软件提供灵活的开发环境。于是中间件是指一类软件,是基于分布式处理的软件,最突出的特色是其网络通讯功能。也可认为中间件是位于平台和应用之间的通用服务,这些服务具备标准的程序接口和协议。针对不一样的操做系统和硬件平台,能够有符合接口和协议的多种实现。数据库

1.1 中间件分类

中间件能够分为六类:编程

1) 终端仿真/屏幕转换缓存

2) 数据访问中间件(UDA)tomcat

3) 远程过程调用中间件(RPC)安全

4) 消息中间件(MOM)服务器

5) 交易中间件(TPM)markdown

6) 对象中间件网络

然而在实际应用中,通常将中间件分为两大类:数据结构

一类是底层中间件,用于支撑单个应用系统或解决一类问题,包括交易中间件、应用服务器、消息中间件、数据访问中间件等;

另外一类是高层中间件,更多的用于系统整合,包括企业应用集成中间件、工做流中间件、门户中间件等,他们一般会与多个应用系统打交道,在系统中层次较高,并大多基于前一类的底层中间件运行。

终端仿真/屏幕转换

此类中间件用于实现客户机图形用户接口与已有的字符接口方式的服务器应用程序之间的互操做,应用与早期的大型机系统,如今已不多使用。

数据访问中间件

此类中间件是为了创建数据应用资源互操做的模式,对异构环境下的数据库或文件系统实现联接。

远程过程调用中间件

此类中间件可使开发人员在须要时调用位于远端服务器上的过程,屏蔽了在调用过程当中的通讯细节。一个应用程序使用RPC来远程执行一个位于不一样地址空间里的过程,在效果上看和执行本地调用相同。

交易中间件

此类中间件是专门针对联机交易系统而设计的。联机交易系统须要处理大量并发进程,处理并发涉及到操做系统,文件系统,编程语言,数据通讯,数据库系统,系统管理,应用软件等。而交易中间件根据分布式交易处理的标准及参考模型,对资源管理,交易管理和应用进行了实现,从而使得基于交易中间件开发应用程序更为简单。交易中间件基本上只适用于联机交易系统,是一种较为专用的中间件。

消息中间件

此类中间件是指利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息排队模型,它能够在分布式环境下扩展进程间的通讯。

消息中间件能够即支持同步方式,又支持异步方式。异步中间件比同步中间件具备更强的容错性,在系统故障时能够保证消息的正常传输。异步中间件技术又分为两类:广播方式和发布/订阅方式。因为发布/订阅方式能够指定哪一种类型的用户能够接受哪一种类型的消息,更加有针对性,事实上已成为异步中间件的非正式标准。目前主流的消息中间件产品有IBM的MQSeries,BEA的MessageQ和Sun的JMS等[1]。

对象中间件

传统的对象技术经过封装、继承及多态提供了良好的代码重用功能。但这些对象只存在与一个程序中,外界并不知道它们的存在,也没法访问它们。对象中间件提供了一个标准的构建框架,能使不一样厂家的软件经过不一样的地址空间,网络和操做系统实现交互访问。对象中间件的目标是为软件用户及开发者提供一种应用级的即插即用的互操做性。目前主流的对象中间件有OMG的CORBA,Microsoft 的COM以及IBM的SOM,Sun的RMI等。

中间件的特色

通常来说,中间件具备如下一些特色:知足大量应用的需求,运行于多种硬件和操做系统平台,支持分布式计算,支持标准接口和协议。开发人员经过调用中间件提供的大量API,实现异构环境的通讯,从而屏蔽异构系统中复杂的操做系统和网络协议。

因为标准接口对于可移植性和标准协议对于互操做性的重要性,中间件已成为许多标准化工做的主要部分。分布式应用软件借助中间件能够在不一样的技术之间共享资源。

总的来讲,中间件屏蔽了底层操做系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减小了程序设计的复杂性,将注意力集中与本身的业务上,没必要再为程序在不一样软件系统上的移植而重复工做,从而大大减小了技术上的负担。

2 消息中间件

面向消息的中间件(MOM),提供了以松散耦合的灵活方式集成应用程序的一种机制。它们提供了基于存储和转发的应用程序之间的异步数据发送,即应用程序彼此不直接通讯,而是与做为中介的MOM通讯。MOM提供了有保证的消息发送(至少是在尽量地作到这一点),应用程序开发人员无需了解远程过程调用(RPC)和网络/通讯协议的细节。

消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上,队列存储消息直到它们被用程序读走。经过消息队列,应用程序可独立地执行–它们不须要知道彼此的位置、或在继续执行前不须要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者须要对异构网络环境下的分布式应用提供有效的通讯手段。为了管理须要共享的信息,对应用提供公共的信息交换机制是重要的。设计分布式应用的方法主要有:远程过程调用(RPC)–分布式计算环境(DCE)的基础标准成分之一;对象事务监控(OTM)–基于CORBA的面向对象工业标准与事务处理(TP)监控技术的组合;消息队列(MessageQueue)–构造分布式应用的松耦合方法。

MOM将消息路由给应用程B,这样消息就能够存在于彻底不一样的计算机上,MOM负责处理网络通讯。若是网络链接不可用,MOM会存储消息,直到链接变得可用时,再将消息转发给应用程序B。

灵活性的另外一方面体如今,当应用程序A发送其消息时,应用程序B甚至能够不处于执行状态。MOM将保留这个消息,直到应用程序B开始执行并试着检索消息为止。这还防止了应用程序A由于等待应用程序B检索消息而出现阻塞。这种异步通讯要求应用程序的设计与如今大多数应用程序不一样,不过,对于时间无关或并行处理,它多是一个极其有用的方法。

2.1 消息中间件的传递模式

消息中间件通常有两种传递模式:点对点模式(P2P)和发布-订阅模式(Pub/Sub)。

点对点模式

Point-to-Point(P2P)咱们很容易理解,即生产者和消费者之间的消息往来。 

每一个消息都被发送到特定的消息队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P的特色:

  1. 每一个消息只有一个消费者(Consumer)(即一旦被消费,消息就再也不在消息队列中);
  2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息以后,无论接收者有没有正在运行,它不会影响到消息被发送到队列;
  3. 接收者在成功接收消息以后需向队列应答成功。

发布-订阅模式(Pub/Sub)

咱们能够联想到卖报纸的过程:印刷厂把当天的报纸印好而后送到邮递员手里,邮递员风雨兼程的把报纸送到每一位订阅者手里。由此咱们能够看到发布-订阅模式的一些特色:

  1. 每一个消息能够有多个消费者;
  2. 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须建立一个订阅者以后,才能消费发布者的消息,并且为了消费消息,订阅者必须保持运行的状态;

由上介绍咱们能够看出这两种模式各有千秋,若是你须要点对点的发送消息那么使用P2P更专一,若是你是群发消息,显然pub/sub模式更适合。

3 基于多种协议的消息传递机制

目前市场上对于网络消息传递的协议版本不少,不一样的协议有不一样的规范,咱们在使用时要比对实现不一样协议的产品。下面咱们看一下目前主流的消息传递协议:

3.1 AMQP协议

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP协议是一种二进制协议,提供客户端应用与消息中间件之间异步、安全、高效地交互。

AMQP是一个应用层的异步消息传递协议,为面向消息的中间件而设计。其目的是经过协议使应用模块之间或应用程序与中间件等进行充分解耦。而在设计初期,AMQP的原始用途只是为金融界提供一个能够彼此协做的消息协议。如今已经有至关一部分遵循AMQP的服务器和客户端供使用。其中RabbitMQ是AMQP的一款开源标准实现。

支持全部消息中间件的功能:消息交换、文件传输、流传输、远程进程调用等。

AMQP的服务器(Broker)主要由交换器、消息、队列组成。Broker的主要功能是消息的路由和缓存。对于须要保障可靠性的消息,RabbitMQ能够将消息、队列和交换器的数据写入本地硬盘。而对于响应时间敏感的消息,RabbitMQ能够不配置持久化机制。

解决的问题:

1)信息的发送者和接收者如何维持这个链接,若是一方的链接中断,这期间的数据如何防止丢失?

2)如何下降发送者和接收者的耦合度?

3)如何让Priority高的接收者先接到数据?

4)如何作到load balance?有效均衡接收者的负载?

5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不一样的数据,如何作有效的filter。

6)如何作到可扩展,甚至将这个通讯模块发到cluster上?

7)如何保证接收者接收到了完整,正确的数据?

AMQP协议解决了以上的问题,而RabbitMQ实现了AMQP。

3.2 STOMP协议

STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议。

它提供了一个可互操做的链接格式,容许STOMP客户端与任意STOMP消息代理(Broker)进行交互。STOMP协议因为设计简单,易于开发客户端,所以在多种语言和多种平台上获得普遍地应用。

STOMP协议的前身是TTMP协议(一个简单的基于文本的协议),专为消息中间件设计。

STOMP是一个很是简单和容易实现的协议,其设计灵感源自于HTTP的简单性。尽管STOMP协议在服务器端的实现可能有必定的难度,但客户端的实现却很容易。例如,可使用Telnet登陆到任何的STOMP代理,并与STOMP代理进行交互。

STOMP是除AMQP开放消息协议以外地另一个选择, 实现了被用在JMS brokers中特定的有线协议,好比OpenWire。它仅仅是实现通用消息操做中的一部分,并不是想要覆盖全面的消息API。

STOMP server就好像是一系列的目的地, 消息会被发送到这里。STOMP协议把目的地看成不透明的字符串,其语法是服务端具体的实现。 此外STOMP没有定义目的地的交付语义是什么,语义的目的地能够从服务器到服务器,甚至从目的地到目的地。这使得服务器有可创造性的语义,去支持STOMP。

STOMP client的用户代理能够充当两个角色(可能同时):

  1. 做为生产者,经过SENDframe发送消息到server
  2. 做为消费者,发送SUBSCRIBEframe到目的地而且经过MESSAGEframe从server获取消息。

STOMP协议工做于TCP协议之上,使用了下列命令:

  • SEND 发送

  • SUBSCRIBE 订阅

  • UNSUBSCRIBE 退订

  • BEGIN 开始

  • COMMIT 提交

  • ABORT 取消

  • ACK 确认

  • DISCONNECT 断开

目前最流行的STOMP消息代理是Apache ActiveMQ。

3.3 JMS协议

JMS是Java Message Service的缩写,即Java消息服务。

在大型互联网中,咱们采用消息中间件能够进行应用之间的解耦以及操做的异步,这是消息中间件两个最基础的特色,也正是咱们所须要的。在此基础上,咱们着重思考的是消息的顺序保证、扩展性、可靠性、业务操做与消息发送一致性,以及多集群订阅者等方面的问题。固然,这些咱们要思考的东西,JMS都已经想到了,先看下JMS能帮开发者作什么:

一、定义一组消息公用概念和实用工具

全部Java应用程序均可以使用JMS中定义的API去完成消息的建立、接收与发送,任何实现了JMS标准的MOM均可以做为消息的中介,完成消息的存储转发

二、最大化消息应用程序的可移植性

MOM提供了有保证的消息发送,应用程序开发人员无需了解远程过程调用(RPC)和网络/通讯协议的细节,提供了程序的可移植性

三、最大化下降应用程序与应用程序之间的耦合度

因为MOM的存在,各个应用程序只关心和MOM之间如何进行消息的接收与发送,而无须关注MOM的另外一边,其余程序是如何接收和发送的

JMS定义了一套通用的接口和相关语义,提供了诸如持久、验证和事务的消息服务,它最主要的目的是容许Java应用程序访问现有的消息中间件。JMS规范没有指定在消息节点间所使用的通信底层协议,来保证应用开发人员不用与其细节打交道,一个特定的JMS实现可能提供基于TCP/IP、HTTP、UDP或者其它的协议。

因为没有统一的规范和标准,基于消息中间件的应用不可移植,不一样的消息中间件也不能互操做,这大大阻碍了消息中间件的发展。 Java Message Service(JMS, Java消息服务)是SUN及其伙伴公司提出的旨在统一各类消息中间件系统接口的规范。

目前许多厂商采用并实现了JMS API,如今,JMS产品可以为企业提供一套完整的消息传递功能,目前咱们看到的比较流行的JMS商业软件和开源产品:WebLogic、SonicMQ、ActiveMQ、OpenJMS都是基于JMS规范的实现。

4 JMS介绍

在 JMS 以前,每一家 MOM 厂商都用专有 API 为应用程序提供对其产品的访问,一般可用于许多种语言,其中包括 Java 语言。JMS 经过 MOM 产品为 Java 程序提供了一个发送和接收消息的标准的、便利的方法。用 JMS 编写的程序能够在任何实现 JMS 标准的 MOM 上运行。

JMS 可移植性的关键在于:JMS API 是由 Sun 做为一组接口而提供的。提供了 JMS 功能的产品是经过提供一个实现这些接口的提供者来作到这一点的。开发人员能够经过定义一组消息和一组交换这些消息的客户机应用程序创建 JMS 应用程序。

JMS 支持两种消息类型P2P 和Pub/Sub,在JMS消息模型中,根据点对点模式和发布/订阅模式,这些要素由扩展出了各自的内容:

JMS标准 点对点模式 发布/订阅模式
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver TopicSubscriber

JMS为发开者提供了不少的要素,看一下比较重要的几个:

要 素 做 用
Destination 表示消息所走通道的目标定义,用来定义消息从发送端发出后要走的通道,而不是接收方。Destination属于管理类对象
ConnectionFactory 顾名思义,用于建立链接对象,ConnectionFactory属于管理类的对象
Connection 链接接口,所负责的重要工做时建立Session
Session 会话接口,这是一个很是重要的对象,消息发送者、消息接收者以及消息对象自己,都是经过这个会话对象建立的
MessageConsumer 消息的消费者,也就是订阅消息并处理消息的对象
MessageProducer 消息的生产者,也就是用来发送消息的对象
XXXMessage 指各类类型的消息对象,包括ByteMesage、ObjectMessage、StreamMessage和TextMessage这5种

JMS消息模型

JMS 消息由如下几部分组成:消息头,属性,消息体。

  1. 消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,而且为消息肯定路由。
  2. 属性(property):由消息发送者产生,用来添加删除消息头之外的附加信息。
  3. 消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

JMS编程模型

通常来讲咱们在开发基于JMS协议的客户端由一下几部构成:

1) 用JNDI 获得ConnectionFactory对象;

2) 用JNDI 获得目标队列或主题对象,即Destination对象;

3) 用ConnectionFactory建立Connection 对象;

4) 用Connection对象建立一个或多个JMS Session;

5) 用Session 和Destination 建立MessageProducer和MessageConsumer;

6) 通知Connection 开始传递消息。

由于jms须要使用到J2EE服务器,咱们日常用的tomcat属于J2SE类型的服务器,常见的J2EE服务器包括:Geronimo,JBoss 4, GlassFish,WebLogic 。咱们在这里使用glassfish 容器。安装和使用有不少教程,在此就不贴了。首先咱们进去glassfish的控制台,设置一下咱们的发送者和接受者对象:

下面咱们用oracle提供的jms接口来写一个服务端,咱们先来写一个P2P模式的例子:

MySender.java

import java.io.BufferedReader; import java.io.InputStreamReader; import javax.naming.*; import javax.jms.*; public class MySender { public static void main(String[] args) { try { //1)建立一个connection InitialContext ctx=new InitialContext(); QueueConnectionFactory f=(QueueConnectionFactory)ctx.lookup("myQueueConnectionFactory"); QueueConnection con=f.createQueueConnection(); con.start(); //2) 建立一个会话接口 QueueSession ses=con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); //3) 获取会话接口对象 Queue t=(Queue)ctx.lookup("myQueue"); //4)建立一个发送者对象 QueueSender sender=ses.createSender(t); //5) 建立一个消息对象 TextMessage msg=ses.createTextMessage(); //6) 把咱们的消息写入msg对象中 BufferedReader b=new BufferedReader(new InputStreamReader(System.in)); while(true) { System.out.println("Enter Msg, end to terminate:"); String s=b.readLine(); if (s.equals("end")) break; msg.setText(s); //7) 发送消息 sender.send(msg); System.out.println("Message successfully sent."); } //8) 关闭链接 con.close(); }catch(Exception e){System.out.println(e);} } }

 

MyReceiver.java

import javax.jms.*;
import javax.naming.InitialContext;

public class MyReceiver { public static void main(String[] args) { try{ //1) 建立一个connection InitialContext ctx=new InitialContext(); QueueConnectionFactory f=(QueueConnectionFactory)ctx.lookup("myQueueConnectionFactory"); QueueConnection con=f.createQueueConnection(); con.start(); //2) 建立一个会话接口 QueueSession ses=con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); //3) 获取会话接口对象 Queue t=(Queue)ctx.lookup("myQueue"); //4)建立一个发送者对象 QueueReceiver receiver=ses.createReceiver(t); //5) 建立一个消监听对象 MyListener listener=new MyListener(); //6) 将监听器注册到receiver,用来监听receiver receiver.setMessageListener(listener); System.out.println("Receiver1 is ready, waiting for messages..."); System.out.println("press Ctrl+c to shutdown..."); while(true){ Thread.sleep(1000); } }catch(Exception e){System.out.println(e);} } }

 

MyListener.java

import javax.jms.*; public class MyListener implements MessageListener { public void onMessage(Message m) { try{ TextMessage msg=(TextMessage)m; System.out.println("following message is received:"+msg.getText()); }catch(JMSException e){System.out.println(e);} } }

Pub/Sub模式:

MySender.java

import javax.jms.*; import javax.naming.InitialContext; import java.io.BufferedReader; import java.io.InputStreamReader; public class MySender { public static void main(String[] args) { try { //1)建立一个connection InitialContext ctx=new InitialContext(); TopicConnectionFactory f=(TopicConnectionFactory)ctx.lookup("myTopicConnectionFactory"); TopicConnection con=f.createTopicConnection(); con.start(); //2) 建立一个会话接口 TopicSession ses=con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); //3) 获取会话接口对象 Topic t=(Topic)ctx.lookup("myTopic"); //4)建立一个发送者对象 TopicPublisher publisher=ses.createPublisher(t); //5) 建立一个消息对象 TextMessage msg=ses.createTextMessage(); //6) 把咱们的消息写入msg对象中 BufferedReader b=new BufferedReader(new InputStreamReader(System.in)); while(true) { System.out.println("Enter Msg, end to terminate:"); String s=b.readLine(); if (s.equals("end")) break; msg.setText(s); //7) 发送消息 publisher.publish(msg); System.out.println("Message successfully sent."); } //8) 关闭链接 con.close(); }catch(Exception e){System.out.println(e);} } }

MyReceiver.java

import javax.jms.*;
import javax.naming.InitialContext;

public class MyReceiver { public static void main(String[] args) { try{ //1) 建立一个connection InitialContext ctx=new InitialContext(); TopicConnectionFactory f=(TopicConnectionFactory)ctx.lookup("myTopicConnectionFactory"); TopicConnection con=f.createTopicConnection(); //2) 建立一个会话接口 TopicSession ses=con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); //3) 获取会话接口对象 Topic t=(Topic)ctx.lookup("myTopic"); //4)建立一个发送者对象 TopicSubscriber receiver=ses.createSubscriber(t); //5) 建立一个消监听对象 MyListener listener=new MyListener(); //6) 将监听器注册到receiver,用来监听receiver receiver.setMessageListener(listener); System.out.println("Receiver1 is ready, waiting for messages..."); System.out.println("press Ctrl+c to shutdown..."); while(true){ Thread.sleep(1000); } }catch(Exception e){System.out.println(e);} } }

 

MyListener.java

import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class MyListener implements MessageListener { public void onMessage(Message m) { try{ TextMessage msg=(TextMessage)m; System.out.println("following message is received:"+msg.getText()); }catch(JMSException e){System.out.println(e);} } }

上面两个案例咱们运行能够看到消息成功的发送出去了。熟悉了JMS的语法,使用起来仍是很简单。

上面咱们介绍到了JMS,JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程当中的全部数据结构和交互流程。JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API。 Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

下面咱们引入另外一个概念:MQ(Message Queue)。

应用程序经过写和检索出入列队的针对应用程序的数据(消息)来通讯,而无需专用链接来连接它们。消息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯,直接调用一般是用于诸如远程过程调用的技术。排队指的是应用程序经过队列来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。

MQ和JMS相似,但不一样的是JMS是SUN Java消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程当中的全部数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现能够基于JMS,也能够基于其余规范或标准。MQ 有不少产品:IBM的,rabbitmq, activemq 等,rabbitmq 只支持点对点的方式。因此没有彻底实现JMS的标准,因此说它不是一个JMS产品,而rabitmq 和Jobss JMS 它们实现了JMS的各项标准,是开源的JMS产品。目前彻底实现JMS协议的mq是activemq,因此接下来咱们先重点看一下activemq。从activemq入手去探索javaEE的世界。

相关文章
相关标签/搜索