ActiveMQ

1、什么是消息中间件(MQ)html

1.1 为何会须要消息队列(MQ)?java

  主要缘由是因为在高并发环境下,因为来不及同步处理,请求每每会发生堵塞,好比说,大量的insert,update之类的请求同时到达MySQL,直接致使无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。经过使用消息队列,咱们能够异步处理请求,从而缓解系统的压力。数据库

2.2 什么是消息中间件macos

  消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息排队模型,它能够在分布式环境下扩展进程间的通讯。对于消息中间件,常见的角色大体也就有Producer(生产者)、Consumer(消费者)
  常见的消息中间件产品:
  (1)ActiveMQ
  ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。咱们在本次课程中介绍 ActiveMQ的使用。
  (2)RabbitMQ
  AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通信,OpenStack开源云平台的通讯组件,最早在金融行业获得运用。
  (3)ZeroMQ
  史上最快的消息队列系统
  (4)Kafka
  Apache下的一个子项目 。特色:高吞吐,在一台普通的服务器上既能够达到10W/s的吞吐速率;彻底的分布式系统。适合处理海量数据。apache

2、JMS简介服务器

2.1 什么是JMS  session

  JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,而且经过提供标准的产生、发送、接收消息的接口简化企业应用的开发。并发

       JMS自己只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它相似于 JDBC(java Database Connectivity):这里,JDBC 是能够用来访问许多不一样关系数据库的 API,而 JMS 则提供一样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您可以经过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另外一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
  JMS 定义了五种不一样的消息正文格式,以及调用的消息类型,容许你发送并接收以一些不一样形式的数据,提供现有消息格式的一些级别的兼容性。异步

· TextMessage--一个字符串对象
· MapMessage--一套名称-值对
· ObjectMessage--一个序列化的 Java 对象
· BytesMessage--一个字节的数据流
· StreamMessage -- Java 原始值的数据流tcp

2.2 JMS 消息传递类型

  对于消息的传递有两种类型:

  一种是点对点的,即一个生产者和一个消费者一一对应:

  另外一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,能够由多个消费者进行接收:

  

 3、ActiveMQ下载与安装

3.1 下载

  官方下载地址:http://activemq.apache.org/activemq-5153-release.html

3.2 安装(OSX,Linux类同)

  解压下载文件,apache-activemq-5.12.0-bin.tar.gz

tar zxvf apache-activemq-5.12.0-bin.tar.gz

  为apache-activemq-5.12.0目录赋权

chmod 777 apache-activemq-5.12.0

3.3 启动、访问与关闭

#切换至安装目录macosx下

macosx Mac$ pwd

/Users/Mac/JavaUtils/apache-activemq-5.15.3/bin/macosx

#启动activemq服务

macosx Mac$ ./activemq start

Starting ActiveMQ Broker...

#关闭activemq服务

:macosx Mac$ ./activemq stop

Stopping ActiveMQ Broker...

Stopped ActiveMQ Broker.

:macosx Mac$ 

 4、JMS入门

 4.1 点对点模式

一个生成者产生一个消息 只能被被一个消费者消费,消费完,消息就没有了。

 4.1.1 消息生产者

(1)建立工程,引入依赖

<dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.13.4</version>
</dependency>

(2)建立生产者

 1 public class QueueProducer {
 2 
 3     public static void main(String[] args) throws JMSException {
 4         //1.建立链接工厂
 5         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 6         //2.获取链接
 7         Connection connection = connectionFactory.createConnection();
 8         //3.启动链接
 9         connection.start();
10         /*4.获取session  (参数1:是否启动事务,
11          参数2:消息确认模式[
12          AUTO_ACKNOWLEDGE = 1    自动确认
13          CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
14          DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
15          SESSION_TRANSACTED = 0    事务提交并确认
16         ])*/
17         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
18         //5.建立队列对象
19         Queue queue = session.createQueue("test-queue");
20         //6.建立消息生产者
21         MessageProducer producer = session.createProducer(queue);
22         //7.建立消息
23         TextMessage textMessage = session.createTextMessage("欢迎来到MQ世界");
24         //8.发送消息
25         producer.send(textMessage);
26         //9.关闭资源
27         producer.close();
28         session.close();
29         connection.close();
30     }
31 
32 }

(3)运行经过界面查看

4.1.2 消息消费者

(1)建立消息消费者

 1 public class QueueConsumer {
 2     public static void main(String[] args) throws JMSException, IOException {
 3         //1.建立链接工厂
 4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 5         //2.获取链接
 6         Connection connection = connectionFactory.createConnection();
 7         //3.启动链接
 8         connection.start();
 9         //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
10         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
11         //5.建立队列对象
12         Queue queue = session.createQueue("test-queue");
13         //6.建立消息消费者
14         MessageConsumer consumer = session.createConsumer(queue);
15         //7.监听消息
16         consumer.setMessageListener(new MessageListener() {
17             @Override
18             public void onMessage(Message message) {
19                 TextMessage textMessage = (TextMessage) message;
20                 try {
21                     System.out.println("接收到消息:"+textMessage.getText());
22                 } catch (JMSException e) {
23                     e.printStackTrace();
24                 }
25             }
26         });
27         //8.等待键盘输入
28         System.in.read();
29         //9.关闭资源
30         consumer.close();
31         session.close();
32         connection.close();
33     }
34 
35 }

(2)运行查看控制台输出与经过界面查看

 4.2 发布/订阅模式 

4.2.1 消息生产者

 1 public class TopicProducer {
 2 
 3     public static void main(String[] args) throws JMSException {
 4         //1.建立链接工厂
 5         ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 6         //2.获取链接
 7         Connection connection = connectionFactory.createConnection();
 8         //3.启动链接
 9         connection.start();
10         /*4.获取session  (参数1:是否启动事务,
11          参数2:消息确认模式)*/
12         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
13         //5.建立主题对象
14         Topic topic = session.createTopic("test-topic"); 15         //6.建立消息生产者
16         MessageProducer producer = session.createProducer(topic); 17         //7.建立消息
18         TextMessage textMessage = session.createTextMessage("欢迎来到MQ世界!");
19         //8.发送消息
20         producer.send(textMessage);
21         //9.关闭资源
22         producer.close();
23         session.close();
24         connection.close();
25     }
26 
27 }

4.2.2 消息2个消费者(消费者2代码同消费者1)

 1 public class TopicConsumer1 {
 2     public static void main(String[] args) throws JMSException, IOException {
 3         //1.建立链接工厂
 4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
 5         //2.获取链接
 6         Connection connection = connectionFactory.createConnection();
 7         //3.启动链接
 8         connection.start();
 9         //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
10         Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
11         //5.建立主题对象
12         Topic topic = session.createTopic("test-topic");
13         //6.建立消息消费者
14         MessageConsumer consumer = session.createConsumer(topic);
15         //7.监听消息
16         consumer.setMessageListener(new MessageListener() {
17             @Override
18             public void onMessage(Message message) {
19                 TextMessage textMessage = (TextMessage) message;
20                 try {
21                     System.out.println("消费1--接收到消息:"+textMessage.getText());
22                 } catch (JMSException e) {
23                     e.printStackTrace();
24                 }
25             }
26         });
27         //8.等待键盘输入
28         System.in.read();
29         //9.关闭资源
30         consumer.close();
31         session.close();
32         connection.close();
33     }
34 
35 }

4.2.3 运行查看测试结果

同时开启2个以上的消费者,再次运行生产者,观察每一个消费者控制台的输出,会发现每一个消费者会接收到消息。

4.3 总结

发布订阅的模式 默认的请状况下:消息的内容不存在服务器,当生产者发送了一个消息,若是消费者以前没有订阅,就没了。
点对点的方式:默认的请状况下:将消息存储在服务器上,消费者随时来取,可是一旦一个消费者获取到了消息,这个消息就没有了。
相关文章
相关标签/搜索