1.1 java消息服务:
不同系统之间的信息交换,是我们开发中比较常见的场景,比如系统A要把数据发送给系统B,这个问题我们应该如何去处理? 1999年,原来的SUN公司领衔提出了一种面向消息的中间件服务--JMS规范(标准);常用的几种信息交互技术(httpClient、hessian、dubbo、jms、webservice 五种).
1.2JMS概述:
JMS即Java消息服务(Java Message Service的简称),是Java EE 的标准/规范之一。这种规范(标准)指出:消息的发送应该是异步的、非阻塞的。也就是说消息的发送者发送完消息后就直接返回了,不需要等待接收者返回后才能返回,发送者和接收者可以说是互不影响。所以这种规范(标准)能够减轻或消除系统瓶颈,实现系统之间去除耦合,提高系统的整体可伸缩性和灵活性。JMS只是Java EE中定义的一组标准API,它自身并不是一个消息服务系统,它是消息传送服务的一个抽象,也就是说它定义了消息传送的接口而并没有具体实现。
1.3ActiveMQ概述:
我们知道JMS只是消息服务的一组规范和接口,并没有具体的实现,而ActiveMQ就是JMS规范的具体实现;它是Apache下的一个项目,采用Java语言开发;是一款非常流行的开源消息服务器.
1.4 ActiveMQ与JMS关系:
我们知道,JMS只是定义了一组有关消息传送的规范和标准,并没有真正实现,也就说JMS只是定义了一组接口而已;就像JDBC抽象了关系数据库访问、JPA抽象了对象与关系数据库映射、JNDI抽象了命名目录服务访问一样,JMS具体的实现由不同的消息中间件厂商提供,比如Apache ActiveMQ就是JMS规范的具体实现,Apache ActiveMQ才是一个消息服务系统,而JMS不是。
二.ActiveMQ的使用
2.1 ActiveMQ环境搭建:
1.ActiveMQ运行需要Java的支持,首先需要配置Java环境变量;
3.切换到解压后的activemq的bin目录下 cd / usr / local / apache - activemq - 5.15.2 去启动
3.切换到bin目录下,启动:./activemq start ;关闭: . / activemq stop
4.启动后有两个端口号,一个是web控制台:8161,一个是消息服务broker连接端口:61616
5.web管理控制台admin URL地址:http : // localhost : 8161 默认登录账号 admin 密码 admin,注意:Linux防火前要关闭 ;通过这个地址可以即时访问交互信息.如下图:
消息服务broker URL地址 : tcp : // localhost : 61616
2.2 Java消息队列JMS整体设计结构
2.2.1 基本要素:1、生产者producer ; 2、消费者consumer ; 3、消息服务broker
2.2.1 交互模型:
2.2.3 JMS两种消息传送模式
点对点( Point-to-Point):专门用于使用队列Queue传送消息;基于队列Queue的点对点消息只能被一个消费者消费,如多个消费者都注册到同一个消息队列上,当生产者发送一条消息后,而只有其中一个消费者会接收到该消息,而不是所有消费者都能接收到该消息。
发布/订阅(Publish/Subscribe):专门用于使用主题Topic传送消息。基于主题的发布与订阅消息能被多个消费者消费,生产者发送的消息,所有订阅了该topic的消费者都能接收到。
2.3 Java消息队列JMS API总体概览:
2.3.1JMS API 概览
JMS API可以分为3个主要部分:
1、公共API:可用于向一个队列或主题发送消息或从其中接收消息;
2、点对点API:专门用于使用队列Queue传送消息;
3、发布/订阅API:专门用于使用主题Topic传送消息。
JMS公共API:
在JMS公共API内部,和发送与接收消息有关的JMS API接口主要是:ConnectionFactory / Connection / Session / Message / Destination / MessageProducer / MessageConsumer . 它们的关系是:一旦有了ConnectionFactory,就可以创建Connection,一旦有了Connection,就可以创建Session,而一旦有了Session,就可以创建 Message 、MessageProducer 和 MessageConsumer 。
JMS点对点API:
点对点(p2p)消息传送模型API是指JMS API之内基于队列(Queue)的接口:QueueConnectionFactory / QueueConnection / QueueSession / Message / Queue / QueueSender / QueueReceiver .
从接口的命名可以看出,大多数接口名称仅仅是在公共API接口
名称之前添加Queue一词。一般来说,使用点对点消息传送模型的应用程序将使用基于队列的API,而不使用公共API 。
JMS发布/订阅API:
发布/订阅消息传送模型API是指JMS
API之内基于主题(Topic)的接口:TopicConnectionFactory / TopicConnection /
TopicSession / Message / Topic / TopicPublisher / TopicSubscriber . 由于基于主题(Topic)的JMS API类似于基于队列(Queue)
的API,因此在大多数情况下,Queue这个词会由Topic取代。
2.4 ActiveMQ点对点发送与接收消息示例
2.4.1 简单示例:
写一个采用Queue队列方式点对点发送接收文本信息的Demo,先写发送者,如下:
package com.kinglong.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/** * 消息发送者 * */ public class Sender {
/**消息服务器的连接地址**/ public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) { Sender sender = new Sender(); sender.sendMessage("Hello ActiveMQ."); }
/** * 发送消息 * * @param msg */ public void sendMessage (String msg) {
Connection connection = null; Session session = null; MessageProducer messageProducer = null;
try { //1.创建一个连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.创建一个连接 connection = connectionFactory.createConnection();
//3.创建一个Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4.创建消息,此处创建了一个文本消息 Message message = session.createTextMessage(msg);
//5.创建一个目的地 Destination destination = session.createQueue("myQueue");
//6.创建一个消息的生产者(发送者) messageProducer = session.createProducer(destination);
//7.发送消息 messageProducer.send(message);
} catch (JMSException e) { e.printStackTrace(); } finally { try { //关闭连接释放资源 if (null != messageProducer) { messageProducer.close(); } if (null != session) { session.close(); } if (null != connection) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } } |
再写接收者:
package com.kinglong.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Receiver {
/**消息服务器的连接地址**/ public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) { Receiver receiver = new Receiver(); receiver.receiveMessage(); }
/** * 接收消息 * */ public void receiveMessage () {
Connection connection = null; Session session = null; MessageConsumer messageConsumer = null;
try { //1.创建一个连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.创建一个连接 connection = connectionFactory.createConnection();
//3.创建一个Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4.创建一个目的地 Destination destination = session.createQueue("myQueue");
//5.创建一个消息的消费者(接收者) messageConsumer = session.createConsumer(destination);
//接收消息之前,需要把连接启动一下 connection.start();
//6.接收消息 Message message = messageConsumer.receive();
//判断消息的类型 if (message instanceof TextMessage) { //判断是否是文本消息 String text = ((TextMessage) message).getText(); System.out.println("接收到的消息内容是:" + text); } } catch (JMSException e) { e.printStackTrace(); } finally { try { //关闭连接释放资源 if (null != messageConsumer) { messageConsumer.close(); } if (null != session) { session.close(); } if (null != connection) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } } |
总结:
创建过程中有几个重要的注意点,说明一下:
1. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 其中:Boolean.FALSE表示本次会话不开启事务管理,假如需要开启事务管理,将其改为Boolean.TRUE即可 //同时需要在发送消息后添加session.commit(),否则,消息是不会被提交的. //Session.AUTO_ACKNOWLEDGE表示消息确认机制 AUTO_ACKNOWLEDGE:自动确认 CLIENT_ACKNOWLEDGE:客户端确认 SESSION_TRANSACTED:事务确认,如果使用事务推荐使用该确认机制 AUTO_ACKNOWLEDGE:懒散式确认,消息偶尔不会被确认,也就是消息可能会被重复发送.但发生的概率很小 2. connection.start(); //在消息接收端,接受消息前需要加入这段代码,开启连接,否则一样无法获取消息. 3. Destination destination = session.createQueue("myQueue"); //创建目的地时,如果做测试收不到信息,可以将目的地名称修改一下,我用的是IDEA,不清楚为何, //有时候收不到信息,修改一下就好了,猜测可能是缓存的原因吧 |
发布与订阅的topic方式实际与点对点的queue方式,代码通用很多,只是在创建目的地Destination时候创建为
Destination destination = session.createTopic("myTopic |
需要项目资料:请加QQ:1547737063获取
附加java开发的资料(面试资源与经验总结,Dubbo、Redis、设计模式、Netty、zookeeper、Spring cloud、分布式、高并发等架构技术视频教程资料,架构思维导图,以及面试资料,了解最新的学习动态;了解最新的阿里、京东招聘资讯)