一:JMS基本概念html
JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯(用于解决两个或者多个程序之间的耦合)。它便于消息系统中的 Java 应用程序进行消息交换,而且经过提供标准的产生、发送、接收消息的接口简化企业应用的开发。java
也就是说它定义看一系列规范,而后你们按照这种规范来开发本身消息服务,固然,如今有好多开源的来供你们使用了, 好比说Apache ActiveMQ、RabbitMQ、Redis、Jafka/Kafka 等等这些web
1. JMS的目标apache
为企业级的应用提供一种智能的消息系统,JMS定义了一整套的企业级的消息概念与工具,尽量最小化的Java语言概念去构建最大化企业消息应用。统一已经存在的企业级消息系统功能。windows
2. JMS应用程序, 服务器
一个完整的JMS应用应该实现如下功能:session
l JMS 客户端 – Java语言开发的接受与发送消息的程序并发
l 非JMS客户端 – 基于消息系统的本地API实现而不是JMSapp
l 消息 – 应用程序用来相互交流信息的载体webapp
l 被管理对象–预先配置的JMS对象,JMS管理员建立,被客户端运用。如连接工厂,主题等
l JMS提供者–完成JMS功能与管理功能的消息系统
3. JMS体系结构
描述以下:
l JMS提供者(JMS的实现者,好比activemq jbossmq等)
l JMS客户(使用提供者发送消息的程序或对象,例如在12306中,负责发送一条购票消息处处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫作客户)
l JMS生产者,JMS消费者(生产者及负责建立并发送消息的客户,消费者是负责接收并处理消息的客户)
l JMS消息(在JMS客户之间传递数据的对象)
l JMS队列(一个容纳那些被发送的等待阅读的消息的区域)
l JMS主题(一种支持发送消息给多个订阅者的机制)
4. JMS对象模型
l 链接工厂(connectionfactory)客户端使用链接工厂建立一个JMS链接(connection)。
l JMS链接 表示JMS客户端和服务器端之间的一个活动的链接,是由客户端经过调用链接工厂的方法创建的。
l JMS会话 session 标识JMS客户端和服务端的会话状态。会话创建在JMS链接上,标识客户与服务器之间的一个会话进程。
二:JMS的消息模式
1. 点对点的消息模式(Point to Point Messaging)
点对点消息模型:经过一个服务器消息队列实现,消息的发送者向队列写入消息,消息的接收者从队列取出消息。
下面的JMS对象在点对点消息模式中是必须的:
a. 队列(Queue) – 一个提供者命名的队列对象,客户端将会使用这个命名的队列对象
b. 队列连接工厂(QueueConnectionFactory) – 客户端使用队列连接工厂建立连接队列
ConnectionQueue来取得与JMS点对点消息提供者的连接。
c. 连接队列(ConnectionQueue) – 一个活动的连接队列存在在客户端与点对点消息提供者之间,客户用它建立一个或者多个JMS队列会话(QueueSession)
d. 队列会话(QueueSession) – 用来建立队列消息的发送者与接受者(QueueSenderand
QueueReceiver)
e. 消息发送者(QueueSender 或者MessageProducer)– 发送消息到已经声明的队列
f. 消息接受者(QueueReceiver或者MessageConsumer) – 接受已经被发送到指定队列的消息
2. 发布订阅模式(publish – subscribe Mode)
发布-订阅模式:把消息发送到给一个主题(Topic),消息服务器将消息发布给订阅器该主题的每个订阅者。举个通俗的例子,就比如如一家杂志社(至关于消息发送者)把一堆杂志(至关于消息)寄到了邮政(至关于主题),再由邮政将杂志发给每个有订阅这本杂志的读者(至关于消息接收者)
必须的消息对象:
a. 主题Topic(Destination) – 一个提供者命名的主题对象,客户端将会使用这个命名的主题对象
b. 主题连接工厂(TopciConnectionFactory) – 客户端使用主题连接工厂建立连接主题
ConnectionTopic来取得与JMS消息Pub/Sub提供者的连接。
c. 连接主题(ConnectionTopic) – 一个活动的连接主题存在发布者与订阅者之间
d. 会话(TopicSession) – 用来建立主题消息的发布者与订阅者 (TopicPublisher and
TopicSubscribers)
e. 消息发送者MessageProducer) – 发送消息到已经声明的主题
f. 消息接受者(MessageConsumer) – 接受已经被发送到指定主题的消息
3. 区别:
点对点模型每个消息只有一个接收者。
发布-订阅消息模式的每个消息能够有多个接收者。
三:介绍ActiveMQ
ActiveMQ 是 Apache 出品,最流行的、能力强劲的开源消息总线。ActiveMQ 是一个彻底支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,能够很容易内嵌到使用Spring的系统里面去,因此咱们选择它。
ActiveMQ拥有如下优势
1.支持多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.彻底支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3.对Spring的支持,ActiveMQ能够很容易内嵌到使用Spring的系统里面
4.彻底支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
5.经过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中经过JCA 1.5 resource adaptors的配置,可让ActiveMQ能够自动的部署到任何兼容J2EE 1.4 商业服务器上
6.支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
7.从设计上保证了高性能的集群,客户端-服务器,点对点
8.支持Ajax
9.支持与Axis的整合
10.能够很容易得调用内嵌JMS provider,进行测试
安装:
ActiveMQ(本文简称MQ)要求JDK1.5以上。
下载地址:
http://activemq.apache.org/download.html
解压:
activemq-all-5.5.0.jar:全部MQ JAR包的集合,用于用户系统调用
bin:其中包含MQ的启动脚本
conf:包含MQ的全部配置文件
data:日志文件及持久性消息数据
example:MQ的示例
lib:MQ运行所需的全部Lib
webapps:MQ的Web控制台及一些相关的DEMO
启动MQ:
Linux ./active start | stop
windows: 双击bin目录下的activemq.bat文件便可启动MQ
登陆地址:
http://IP:8161 (http://172.16.0.15:8161)
四:基于ActiveMQ的(Point to Point)模式Demo程序
消息对象
public class MqBean implements Serializable {
private Integer age;
private String name;
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
4.1 队列消息的发送:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.TransportListener;
import javax.jms.*;
import java.io.IOException;
/**
* Created with IntelliJ IDEA.
* Project Name : ActiveMQ
* User: Jelynn
* Date: 2017/4/10
* Time: 10:07
* Describe:
* Version:1.0
*/
public class Sender {
public static void main(String[] args) {
send();
}
//队列消息的发送
public static void send() {
Connection connection;
Session session;
Destination destination;
MessageProducer producer;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "failover:tcp://172.16.0.15:61616");
connectionFactory.setTrustAllPackages(true);
try {
connection = connectionFactory.createConnection();
connection.start();
//第一个参数是是不是事务型消息,设置为true,第二个参数无效
//第二个参数是
//Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。异常也会确认消息,应该是在执行以前确认的
//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。能够在失败的
//时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的链接不断开,其余的消费者也不会接受(正常状况下队列模式不存在其余消费者)
//DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。在须要考虑资源使用时,这种模式很是有效。
//待测试
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = session.createQueue("jelynn-queue"); //点对点模型
producer = session.createProducer(destination);
//NON_PERSISTENT
//PERSISTENT
producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息持久化,默认就是持久的(未消费的消息会持久化)
//ObjectMessage
MqBean mqBean = new MqBean();
mqBean.setAge(20);
int i = 0;
String str;
while (true){
i++;
// str = "小黄" + i;
// producer.send(session.createTextMessage(str));
mqBean.setName("小黄" + i);
producer.send(session.createObjectMessage(mqBean));
Thread.sleep(1000);
}
// producer.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.2 队列消息的接收
package com.jelynn.activemq.p2p;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created with IntelliJ IDEA.
* Project Name : ActiveMQ
* User: Jelynn
* Date: 2017/4/10
* Time: 10:07
* Describe:
* Version:1.0
*/
public class Receiver {
public static void main(String[] args) {
receive();
}
//消息队列接收
public static void receive(){
Connection connection;
Session session;
Destination destination;
MessageConsumer consumer;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","failover:tcp://172.16.0.15:61616");
connectionFactory.setTrustAllPackages(true);
try {
// 构造从工厂获得链接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操做链接
//这个最好仍是有事务
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("jelynn-queue");
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if(null != message){
MqBean mqBean = (MqBean) ((ObjectMessage)message).getObject();
System.out.println("接收到消息"+mqBean.getName());
}
// if(null != message){
// String str = ((TextMessage)message).getText();
// System.out.println("接收到消息 : "+str);
// }
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
若是针对一个queue,定义有多个Receiver,则一条message只能被一个Receiver消费,其余的没法接收到该消息。
注意:若是传输的消息为ObjectMessage,须要进行以下配置:
在${ACTIVEMQ_HOME}/bin/env 的ACTIVEMQ_OPTS参数中添加:
-Dorg.apache.activemq.SERIALIZABLE_PACKAGES=* (*表示全部,也能够添加具体的包)
5.12.4和5.13.0之后的版本,能够在客户端须要信任的包:
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(",")))); |
或者信任全部:
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setTrustAllPackages(true); |
参考:(http://activemq.apache.org/objectmessage.html)
五:基于ActiveMQ的Publish/subscribe模式Demo程序
5.1订阅消息的发送
package com.jelynn.activemq.publishsubscribe;
import com.jelynn.activemq.p2p.MqBean;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Jelynn on 2017/4/10.
* 订阅消息的发送
*/
public class Publisher {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:tcp://172.16.0.15:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic.messages");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while(true) {
TextMessage message = session.createTextMessage();
message.setText("message_" + System.currentTimeMillis());
producer.send(message);
System.out.println("Sent message: " + message.getText());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// session.close();
// connection.stop();
// connection.close();
}
}
5.2订阅消息的接收
package com.jelynn.activemq.publishsubscribe;
import com.jelynn.activemq.p2p.MqBean;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by Jelynn on 2017/4/10.
* 订阅消息的接收
* <p/>
* Number Of Pending Messages 等待消费的消息这个是当前未出队列的数量。能够理解为总接收数-总出队列数
* Messages Enqueued 进入队列的消息进入队列的总数量,包括出队列的。这个数量只增不减
* Messages Dequeued 出了队列的消息能够理解为是消费这消费掉的数量
*/
public class Subscriber {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:tcp://172.16.0.15:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic.messages");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// session.close();
// connection.stop();
// connection.close();
}
}
能够定义多个Subscriber,进行订阅消息的接收,每一个Subscriber都能接收到订阅消息