消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流,并基于 数据通讯 来进行分布式系统的集成。html
根据消息队列的特色,能够衍生出不少场景,或者说不少场景都能用到。下面举几个例子:java
1)异步通讯docker
注册时的短信、邮件通知,减小响应时间;apache
2)应用解耦编程
信息发送者和消息接受者无需耦合,好比调用第三方;服务器
3)流量削峰微信
例如秒杀系统;网络
官网:activemq.apache.org/session
简介:架构
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
特色:
支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各类跨语言客户端和协议
彻底支持JMS客户端和Message Broker中的企业集成模式
支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
彻底支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
Spring支持,以便ActiveMQ能够轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
专为高性能集群,客户端 - 服务器,基于对等的通讯而设计
CXF和Axis支持,以便ActiveMQ能够轻松地放入这些Web服务堆栈中以提供可靠的消息传递
能够用做内存JMS提供程序,很是适合单元测试JMS
支持可插拔传输协议,例如in-VM,TCP,SSL,NIO,UDP,多播,JGroups和JXTA传输
使用JDBC和高性能日志支持很是快速的持久性
简介:
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。RabbitMQ轻巧且易于部署在云端。 它支持多种消息传递协议。 RabbitMQ能够部署在分布式和联合配置中,以知足高规模,高可用性需求。RabbitMQ可运行在许多操做系统和云环境中,并为大多数流行语言提供普遍的开发工具。(来自官网翻译)
AMQP (Advanced MessageQueue):高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ最初普遍应用于金融行业,根据官网描述,它具备以下特色:
特色:
简介:
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它能够处理消费者规模的网站中的全部动做流数据。 这种动做(网页浏览,搜索和其余用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。 对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是经过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了经过集群来提供实时的消息。
Kafka它主要用于处理活跃的流式数据,所以Kafaka在大数据系统中使用较多。
特色:
同时为发布和订阅提供高吞吐量。据了解,Kafka每秒能够生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
可进行持久化操做。将消息持久化到磁盘,所以可用于批量消费,例如ETL,以及实时应用程序。经过将数据持久化到硬盘以及replication防止数据丢失。
分布式系统,易于向外扩展。全部的producer、broker和consumer都会有多个,均为分布式的。无需停机便可扩展机器。
消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
支持online和offline的场景。
简介:
RocketMQ是阿里开源的消息中间件,目前在Apache孵化,使用纯Java开发,具备高吞吐量、高可用性、适合大规模分布式系统应用的特色。RocketMQ思路起源于Kafka,但并非简单的复制,它对消息的可靠传输及事务性作了优化,目前在阿里集团被普遍应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景,支撑了阿里屡次双十一活动。
特色:
activemq在各个系统下都有对应的安装包。如下来演示Linux系统下安装activemq。
进入apache-activemq-5.15.8/bin目录,启动activemq./activemq start
输出以上信息,表示启动成功。
在安装过程当中,经过查看activemq的运行状态,
显示以上。
经过./bin/activemq console
命令查看运行日志:
主机名中包含非法字符;
那么解决办法就很简单了,改主机名:
一、方法一使用hostnamectl命令
hostnamectl set-hostname 主机名
二、方法二:修改配置文件 /etc/hostname 保存退出
修改完成以后重启便可,这里我使用的是方法一:
hostnamectl set-hostname activemq
查看运行状态:
待ActiveMQ安装启动好,访问http://ip:8161/admin,登陆名和密码都是admin(在配置文件中可修改),进入ActiveMQ的主页:
下面来介绍每一个菜单的功能:
Name:消息队列的名称。
Number Of Pending Messages:未被消费的消息数目。
Number Of Consumers:消费者的数量。
Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
Messages Dequeued:出了队列的消息,能够理解为是被消费掉的消息数量。在Queues里它和进入队列的总数量相等(由于一个消息只会被成功消费一次),若是暂时不等是由于消费者还没来得及消费。
Name:主题名称。
Number Of Pending Messages:未被消费的消息数目。
Number Of Consumers:消费者的数量。
Messages Enqueued:进入队列的消息 ;进入队列的总消息数目,包括已经被消费的和未被消费的。 这个数量只增不减。
Messages Dequeued:出了队列的消息,能够理解为是被消费掉的消息数量。在Topics里,由于多消费者从而致使数量会比入队列数高。
查看订阅者信息,只在Topics消息类型中这个页面才会有数据。
引入jar包:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
点对点模型,采用的是队列(Queue)做为消息载体。在该模式中,一条消息只能被一个消费者消费,没有被消费的,只能留在队列中,等待被消费,或者超时。举个例子,若是队列中有10条消息,有两个消费者,就是一个消费者消费5条信息,你一条我一条。如下以代码演示。
消息发布者:
public static void main(String[] args) throws JMSException { /* * 实现步骤 * 1.创建ConnectionFactory工厂对象,须要填入用户名、密码、链接地址(通常使用默认,若是没有修改的话) * 2.经过ConnectionFactory对象建立一个Connection链接,而且调用Connection的start方法开启链接,Connection方法默认是关闭的 * 3.经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数1是是否启用事物,参数2是签收模式,通常设置为自动签收 * 4.经过Session对象建立Destination对象,指的是一个客户端用来制定生产消息目标和消费消息来源的对象。在PTP的模式中,Destination被称做队列,在Pub/Sub模式中,Destination被称做主题(Topic) * 5.经过Session对象建立消息的发送和接收对象(生产者和消费者) * 6.经过MessageProducer的setDeliverMode方法为其设置持久化或者非持久化特性 * 7.使用JMS规范的TextMessage形式建立数据(经过Session对象),并用MessageProducer的send方法发送数据。客户端同理。记得关闭 */ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("queue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i=0;i<=5;i++) { TextMessage textMessage = session.createTextMessage(); textMessage.setText("我是第"+i+"消息"); producer.send(textMessage); } if(connection!=null){ connection.close(); } }
消息消费者:
public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("queue"); MessageConsumer consumer = session.createConsumer(destination); while (true){ TextMessage message = (TextMessage) consumer.receive(); if (message==null){ break; } System.out.println(message.getText()); } if(connection!=null){ connection.close(); } }
先启动两个消费者,在启动发布者:
发布/订阅模型采用的是主题(Topic)做为消息通信载体。该模式相似微信公众号的模式。发布者发布一条信息,而后将该信息传递给全部的订阅者。注意:订阅者想要接收到该信息,必须在该信息发布以前订阅。
发布者发布信息:
public static void main(String[] args) throws JMSException, IOException { // 建立一个ConnectionFactory对象链接MQ服务器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616"); // 建立一个链接对象 Connection connection; connection = connectionFactory.createConnection(); // 开启链接 connection.start(); // 使用Connection对象建立一个Session对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一个Destination对象。topic对象 Topic topic = session.createTopic("test-topic"); // 使用Session对象建立一个消费者对象。 MessageConsumer consumer = session.createConsumer(topic); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印结果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("这是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic消费者启动。。。。"); // 等待接收消息 System.in.read(); // 关闭资源 consumer.close(); session.close(); connection.close(); }
订阅者订阅信息:
public static void main(String[] args) throws JMSException { // 一、建立一个链接工厂对象,须要指定服务的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616"); // 二、使用工厂对象建立一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 三、开启链接,调用Connection对象的start方法。 connection.start(); // 四、建立一个Session对象。 // 第一个参数:是否开启事务。若是true开启事务,第二个参数无心义。通常不开启事务false。 // 第二个参数:应答模式。自动应答或者手动应答。通常自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 五、使用Session对象建立一个Destination对象。两种形式queue、topic,如今应该使用topic Topic topic = session.createTopic("test-topic"); // 六、使用Session对象建立一个Producer对象。 MessageProducer producer = session.createProducer(topic); // 七、建立一个Message对象,可使用TextMessage。 for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第" + i + "一个ActiveMQ队列目的地的消息"); // 八、发送消息 producer.send(textMessage); } // 九、关闭资源 producer.close(); session.close(); connection.close(); }
订阅者要提早订阅,因此先运行订阅者。
1)由以上,咱们能够总结出ActiveMQ的实现步骤:
2)能够看出,P2P模式和Pub/Sub模式,在实现上的区别是经过Session建立的Destination对象不同,在P2P的模式中,Destination被称做队列(Queue),在Pub/Sub模式中,Destination被称做主题(Topic)