以前咱们经过两篇文章(架构设计:系统间通讯(19)——MQ:消息协议(上)、架构设计:系统间通讯(20)——MQ:消息协议(下))从理论层面上为你们介绍了消息协议的基本定义,并花了较大篇幅向读者介绍了三种典型的消息协议:XMPP协议、Stomp协议和AMQP协议。本小节开始,咱们基于以前的知识点讲解这些协议在具体的“消息队列中间件”中是如何被咱们操做的。因为本人在实际工做中常用ActiveMQ和RabbitMQ,因此就选取这两个“消息队列中间件”进行讲解。若是读者能够补充其余“消息队列中间件”的使用,那固然是再好不过了。html
ActiveMQ是Apache软件基金会的开源产品,支持AMQP协议、MQTT协议(和XMPP协议做用相似)、Openwire协议和Stomp协议等多种消息协议。而且ActiveMQ完整支持JMS API接口规范(固然Apache也提供多种其余语言的客户端,例如:C、C++、C#、Ruby、Perl)。java
在本文发布之时,ActiveMQ最新的版本号是5.13.2(版本号升级很快,不过并不推荐使用最新的版本)。由ActiveMQ的安装是很简单,因此这个过程并不值得咱们花很大篇幅进行讨论。具体的过程就是:下载->解压->配置环境变量->运行:linux
您能够Apache ActiveMQ的官网下载安装包:https://activemq.apache.org/download-archives.html。这里咱们示例在CentOS下的安装过程,因此下载Linux下的压缩包便可(http://www.apache.org/dyn/closer.cgi?path=/activemq/5.13.2/apache-activemq-5.13.2-bin.tar.gz)。web
将下载的安装包放置在root用户的home目录内,解压便可(固然您能够根据本身的须要加压到不一样的文件路径下)。以下所示:数据库
[root@localhost ~]# tar -zxvf ./apache-activemq-5.13.2-bin.tar.gz
以上解压使用的是root用户,这是为了演示方便。正式环境中仍是建议禁用root用户,为activeMQ的运行专门建立一个用户和用户组。apache
若是您只是在测试环境使用Apache ActiveMQ,以便熟悉消息中间件自己的特性和使用方式。那么您无需对解压后的软件进行任何配置,全部可运行的命令都在软件安装目录的./bin目录下。为了使用方便,最好配置一下环境变量,以下所示(注意,根据您本身的软件安装位置,环境变量的设置是不同的,请不要盲目粘贴复制):浏览器
设置该次会话的环境变量:
[root@localhost ~]# export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;
永久设置环境变量:
[root@localhost ~]# echo "export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;" >> /etc/profile
在ActiveMQ Version 5.9+的版本中,Apache ActiveMQ 针对操做系统进行了更深刻的优化,因此您能够看到./bin目录下,有一个针对32位Linux运行命令的./linux-x86-32目录,和针对64位Linux运行命令的./linux-x86-64目录。请按照您本身的状况进行环境变量设置和命令运行。ruby
如今您能够在任何目录,运行activemq命令了。注意activemq命令一共有6个参数(console | start | stop | restart | status | dump),启动Apache ActiveMQ使用的命令是activemq start:性能优化
[root@localhost ~]# activemq start
若是启动成功,就能够在浏览器上访问服务节点在8161端口的管理页面了(例如http://localhost:8161):服务器
点击‘manage ActiveMQ broker’链接,能够进入管理主界面(默认的用户和密码都是admin)。以上就是Apache ActiveMQ消息中间件最简的安装和运行方式。在后续的文章中,咱们会陆续讨论ActiveMQ的集群和高性能优化,那时会介绍对应的ActiveMQ的配置问题。
如同上文讲到的,activemq命令除了start参数用于启动activemq程序之外,还有另外5个参数可使用:console | stop | restart | status | dump。他们表明的使用意义是:
stop:中止当前ActiveMQ节点的运行。
restart:从新启动当前ActiveMQ节点。
status:查看当前ActiveMQ节点的运行状态。若是当前ActiveMQ节点没有运行,那么将返回“ActiveMQ Broker is not running”的提示信息。注意,status命令只能告诉开发人员当前节点时中止的仍是运行的,除此以外不能从status命令获取更多的信息。例如,ActiveMQ为何建立Queue失败?当前ActiveMQ使用了多少内存?而要获取这些信息,须要使用如下参数启动ActiveMQ节点。
console:使用控制台模式启动ActiveMQ节点;在这种模式下,开发人员能够调试、监控当前ActivieMQ节点的实时状况,并获取实时状态。
dump:若是您采用console模式运行ActiveMQ,那么就可使用dump参数,在console控制台上获取当前ActiveMQ节点的线程状态快照。
好吧,既然咱们已经讨论过如何安装和运行ActiveMQ,也讨论了Stomp协议的组织结构,为何咱们不当即动手试一试操做ActiveMQ承载Stomp协议的消息呢?
下面咱们使用ActiveMQ提供的JAVA 客户端(实际上就是ActiveMQ对JMS规范的实现),向ActiveMQ中的Queue(示例代码中将这个Queue命名为’test’)发送一条Stomp协议消息,而后再使用JAVA语言的客户端,从ActiveMQ上接受这条消息:
package mq.test.stomp;
import java.net.Socket;
import java.util.Date;
import org.apache.activemq.transport.stomp.StompConnection;
// 消息生产者
public class TestProducer {
public static void main(String[] args) {
try {
// 创建Stomp协议的链接
StompConnection con = new StompConnection();
Socket so = new Socket("192.168.61.138", 61613);
con.open(so);
// 注意,协议版本能够是1.2,也能够是1.1
con.setVersion("1.2");
// 用户名和密码,这个没必要多说了
con.connect("admin", "admin");
// 如下发送一条信息(您也可使用“事务”方式)
con.send("/test", "234543" + new Date().getTime());
} catch(Exception e) {
e.printStackTrace(System.out);
}
}
}
package mq.test.stomp;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
public class TestConsumer {
public static void main(String[] args) throws Exception {
// 创建链接
StompConnection con = new StompConnection();
Socket so = new Socket("192.168.61.138", 61613);
con.open(so);
con.setVersion("1.2");
con.connect("admin", "admin");
String ack = "client";
con.subscribe("/test", "client");
// 接受消息(使用循环进行)
for(;;) {
StompFrame frame = null;
try {
// 注意,若是没有接收到消息,
// 这个消费者线程会停在这里,直到本次等待超时
frame = con.receive();
} catch(SocketTimeoutException e) {
continue;
}
// 打印本次接收到的消息
System.out.println("frame.getAction() = " + frame.getAction());
Map<String, String> headers = frame.getHeaders();
String meesage_id = headers.get("message-id");
System.out.println("frame.getBody() = " + frame.getBody());
System.out.println("frame.getCommandId() = " + frame.getCommandId());
// 在ack是client标记的状况下,确认消息
if("client".equals(ack)) {
con.ack(meesage_id);
}
}
}
}
以上分别是使用Activie提供的Stomp协议的消息生产端和Stomp协议的消息消费端的代码(若是您不清楚Stomp协议的细节,能够参考我另外一篇文章:《架构设计:系统间通讯(19)——MQ:消息协议(上)》)。请注意在代码片断中,并无出现任何一个带有jms名称的包或者类——这是由于ActiveMQ为Stomp协议提供的JAVA API在内部进行了JMS规范的封装。
您能够查看activemq-stomp中关于协议转换部分的源代码:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父级接口:org.apache.activemq.transport.stomp.FrameTranslator来验证这件事情(关于ActiveMQ对JMS规范的实现设计,若是后续有时间再回头进行讲解)。
如下是Stomp协议的消费者端的运行效果(在生产者端已经向ActiveMQ插入了一条消息以后):
frame.getAction() = MESSAGE
frame.getBody() = 2345431458460073204
frame.getCommandId() = 0
注意,因为消息体中插入了一个时间戳,因此您复制粘贴代码后运行效果并不会和个人演示程序彻底一致。
若是您细心的话,在ActiveMQ提供的管理页面上已经看到有两个功能页面:Queue和Topic。Queue和Topic是JMS为开发人员提供的两种不一样工做机制的消息队列。 在ActiveMQ官方的解释是:
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
中文的能够译作:JMS-Topic 队列基于“订阅-发布”模式,当操做者发布一条消息后,全部对这条消息感兴趣的订阅者均可以收到它——也就是说这条消息会被拷贝成多份,进行分发。只有当前“活动的”订阅者可以收到消息(换句话说,若是当前JMS-Topic队列中没有订阅者,这条消息将被丢弃)。
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
So Queues implement a reliable load balancer in JMS.
中文的能够译作:JMS-Queue是一种“负载均衡模式”的实现。一个消息能且只能被一个消费者接受。若是当前JMS-Queue中没有任何的消费者,那么这条消息将会被Queue存储起来(实际应用中能够存储在磁盘上,也能够存储在数据库中,看软件的配置),直到有一个消费者链接上。另外,若是消费者在接受到消息后,在他断开与JMS-Queue链接以前,没有发送ack信息(能够是客户端手动发送,也能够是自动发送),那么这条消息将被发送给其余消费者。
如下表格摘自互联网上的资料,基本上把Queue和Topic这两种队列的不一样特性说清楚了:
比较项目 | Topic 模式队列 | Queue 模式队列 |
---|---|---|
工做模式 | “订阅-发布”模式,若是当前没有订阅者,消息将会被丢弃。若是有多个订阅者,那么这些订阅者都会收到消息 | “负载均衡”模式,若是当前没有消费者,消息也不会丢弃;若是有多个消费者,那么一条消息也只会发送给其中一个消费者,而且要求消费者ack信息。 |
有无状态 | 无状态 | Queue数据默认会在mq服务器上以文件形式保存,好比Active MQ通常保存在$AMQ_HOME\data\kr-store\data下面。也能够配置成DB存储。 |
传递完整性 | 若是没有订阅者,消息会被丢弃 | 消息不会丢弃 |
处理效率 | 因为消息要按照订阅者的数量进行复制,因此处理性能会随着订阅者的增长而明显下降,而且还要结合不一样消息协议自身的性能差别 | 因为一条消息只发送给一个消费者,因此就算消费者再多,性能也不会有明显下降。固然不一样消息协议的具体性能也是有差别的 |
上文已经说到,JMS这套面向消息通讯的 JAVA API 是一个和厂商无关的规范。经过JMS,咱们能实现不一样消息中间件厂商、不一样协议间的转换和交互。这一小节咱们就来讨论一下这个问题。若是用一张图来表示JMS在消息中间件中的做用话,那么就能够这么来画:
首先您使用的MQ消息中间件须要实现了JMS规范;那么经过JMS规范,开发人员能够忽略各类消息协议的细节,只要消息在同一队列中,就可以保证各类消息协议间实现互相转换。下面咱们首先来看一个使用JMS API在ActiveMQ中操做openwire协议消息的简单示例,而后再给出一个经过JMS,实现Stomp消息协议和Openwire消息协议间的互转示例。
package jms;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 测试使用JMS API链接ActiveMQ * @author yinwenjie */
public class JMSProducer {
/** * 因为是测试代码,这里忽略了异常处理。 * 正是代码可不能这样作 * @param args * @throws RuntimeException */
public static void main (String[] args) throws Exception {
// 定义JMS-ActiveMQ链接信息(默认为Openwire协议)
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.61.138:61616");
Session session = null;
Destination sendQueue;
Connection connection = null;
//进行链接
connection = connectionFactory.createQueueConnection();
connection.start();
//创建会话(设置一个带有事务特性的会话)
session = connection.createSession(true, Session.SESSION_TRANSACTED);
//创建queue(固然若是有了就不会重复创建)
sendQueue = session.createQueue("/test");
//创建消息发送者对象
MessageProducer sender = session.createProducer(sendQueue);
TextMessage outMessage = session.createTextMessage();
outMessage.setText("这是发送的消息内容");
//发送(JMS是支持事务的)
sender.send(outMessage);
session.commit();
//关闭
sender.close();
connection.close();
}
}
当以上代码运行到“start”的位置时,咱们能够经过观察ActiveMQ管理界面中connection列表中的链接信息,发现消息生产者已经创建了一个Openwire协议的链接:
从而肯定咱们经过JMS API创建了一个openwire协议的通信链接。接着咱们使用如下代码,创建一个基于openwire协议的“消费者”。注意:消息生产者和消息消费者,映射的队列必须一致。(在示例代码中,它们都映射名称为test的JMS-Queue)
package jms;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 测试使用JMS API链接ActiveMQ * @author yinwenjie */
public class JMSConsumer {
/** * 因为是测试代码,这里忽略了异常处理。 * 正是代码可不能这样作 * @param args * @throws RuntimeException */
public static void main (String[] args) throws Exception {
// 定义JMS-ActiveMQ链接信息
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.61.138:61616");
Session session = null;
Destination sendQueue;
Connection connection = null;
//进行链接
connection = connectionFactory.createQueueConnection();
connection.start();
//创建会话(设置为自动ack)
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建Queue(固然若是有了就不会重复创建)
sendQueue = session.createQueue("/test");
//创建消息发送者对象
MessageConsumer consumer = session.createConsumer(sendQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
// 接收到消息后,不须要再发送ack了。
System.out.println("Message = " + arg0);
}
});
synchronized (JMSConsumer.class) {
JMSConsumer.class.wait();
}
//关闭
consumer.close();
connection.close();
}
}
当以上“消费者”代码运行到start的位置时,咱们经过ActiveMQ提供的管理界面能够看到,基于Openwire协议的链接增长到了两条:
注意,您在运行以上测试代码时,不用和个人运行顺序一致。因为Queue模式的队列是要进行消息状态保存的,因此不管您是先运行“消费者”端,仍是先运行“生产者”端,最后“消费者”都会收到一条消息。相似以下的效果:
Message = ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId = ID:yinwenjie-240-60482-1458616972423-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:yinwenjie-240-60482-1458616972423-1:1:1:1, destination = queue:///test, transactionId = TX:ID:yinwenjie-240-60482-1458616972423-1:1:1, expiration = 0, timestamp = 1458617840154, arrival = 0, brokerInTime = 1458617840166, brokerOutTime = 1458617840187, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@66968df8, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 这是发送的消息内容}
下面咱们将Openwire协议的消息经过JMS送入Queue队列,而且让基于Stomp协议的消费者接收到这条消息。为了节约篇幅,基于Openwire协议的生产者的代码请参考上一小节2-5-1中“生产者”的代码片断。这里只列出Stomp消息的接受者代码(实际上这段代码在上文中也能够找到):
package mq.test.stomp;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
public class TestConsumer {
public static void main(String[] args) throws Exception {
// 创建链接(注意,Stomp协议的链接端口是61613)
StompConnection con = new StompConnection();
Socket so = new Socket("192.168.61.138", 61613);
con.open(so);
con.setVersion("1.2");
con.connect("admin", "admin");
String ack = "client";
con.subscribe("/test", "client");
// 接受消息(使用循环进行)
for(;;) {
StompFrame frame = null;
try {
// 注意,若是没有接收到消息,
// 这个消费者线程会停在这里,直到本次等待超时
frame = con.receive();
} catch(SocketTimeoutException e) {
continue;
}
// 打印本次接收到的消息
System.out.println("frame.getAction() = " + frame.getAction());
Map<String, String> headers = frame.getHeaders();
String meesage_id = headers.get("message-id");
System.out.println("frame.getBody() = " + frame.getBody());
System.out.println("frame.getCommandId() = " + frame.getCommandId());
// 在ack是client模式的状况下,确认消息
if("client".equals(ack)) {
con.ack(meesage_id);
}
}
}
}
当您同时运行Openwire消息发送者和Stomp消息接收者时,您能够在ActiveMQ的管理界面看到这两种协议的链接信息:
如下是Stomp协议消费者接收到的消息内容(通过转换的openwire协议消息):
frame.getAction() = MESSAGE
frame.getBody() = 这是发送的消息内容
frame.getCommandId() = 0
接下文