一、什么是消息中间件?php
关注于数据的发送和接收,利用高效可靠地异步消息传递机制集成分布式系统。java
二、什么是JMS? java - apilinux
Java 消息服务(Java Message Service)即JMS, 是一个Java 平台(不能跨语言)中关于面向消息中间件的API,用户两个应用程序之间,或分布式系统中发送消息,进行异步通讯。c++
三、AMQP协议(为了跨语言) Wire-protocaol, 只支持 byte[] 二进制的消息类型c#
AMQP(advanced message queuing protocol) 是一个提供统一消息服务的应用层标准协议,给予此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样开发语言等条件的限制。api
四、经常使用的消息中间件服务器
<1> ActiveMQ 彻底支持JMS1.1与J2EE 1.4规范的JMS Provider实现。session
支持多种语言 Java c c++ c# Ruby Perl Python PHP,支持的应用协议:OpenWire, Stomp REST, WS Notification, Xmpp, AMQP (没有JMS 由于不是应用协议,只是开发规范)。负载均衡
<2> RabbitMQ 是一个开源的AMQP实现,服务器端使用Erlang语言编写,用于在分布式系统中存储转发消息,在易用性和扩展性 高可用等方便变现不俗dom
支持多种客户端, 如 Python Ruby .Net java jms c php等
<3> kafka 是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的,分区的,可靠地分布式日志存储服务,它经过一种独一无二的设计提供了一个消息系统的功能。不是一个严格的消息中间件,
主要是用来作日志储存。即便是很是普通的硬件kafka也能够支持每秒数百万的消息。
比对:
五、消息模式
主题和队列两种模式。
队列模型: 客户端包含生产者和消费者
队列中的消息只能被一个消费者消费
消费者能够随时消费队列中的消息
主题模型: 客户端包括发布者和订阅者
主题中的消息被全部的订阅者消费
消费者不能消费订阅以前就发送到主题中的消息
六、JMS 编码接口之间的关系
七、队列模式
在linux上安装activemq部分此处省略
安装完后打开activemq平台,访问地址http://192.168.37.128:8161,登陆帐号和密码都是admin
下面咱们来建立一个生产者:
public class AppProducer { private static final String URL = "tcp://192.168.37.128:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception{ // 建立链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 创建链接 Connection connection = connectionFactory.createConnection(); // 启动链接 connection.start(); // 建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一个目标 Destination destination = session.createQueue(QUEUE_NAME); // 建立一个生产者 MessageProducer producer = session.createProducer(destination); // 发送消息 for(int i=0; i < 100; i++){ // 建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 发送消息 producer.send(textMessage); System.out.println("send" + textMessage.getText()); } // 关闭链接 connection.close(); } }
运行成功后,咱们查看activemq平台,发现队列中有100个待消费的消息:
ok, 下面咱们来建立一个消费者:
public class AppConsumer { private static final String URL = "tcp://192.168.37.128:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception{ // 建立链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 创建链接 Connection connection = connectionFactory.createConnection(); // 启动链接 connection.start(); // 建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一个目标 Destination destination = session.createQueue(QUEUE_NAME); // 建立一个消费者 MessageConsumer consumer = session.createConsumer(destination); // 消费消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message arg0) { TextMessage textMessage = (TextMessage)arg0; System.out.println("receive" + textMessage); } }); } }
运行成功后,咱们查看activemq平台,发现队列中100个待消费的消息所有被消费:
8 主题模式 主题模式下,消费者必需要比订阅者先启动,不然不会收到消息。
主题模式和队列模式,代码机会相同,只须要在建立目标的时候改为建立主题。
public class AppTopicProducer { private static final String URL = "tcp://192.168.37.128:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception{ // 建立链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 创建链接 Connection connection = connectionFactory.createConnection(); // 启动链接 connection.start(); // 建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一个目标 Destination destination = session.createTopic(QUEUE_NAME); // 建立一个生产者 MessageProducer producer = session.createProducer(destination); // 发送消息 for(int i=0; i < 100; i++){ // 建立消息 TextMessage textMessage = session.createTextMessage("test" + i); // 发送消息 producer.send(textMessage); System.out.println("send" + textMessage.getText()); } // 关闭链接 connection.close(); } }
public class AppTopicConsumer { private static final String URL = "tcp://192.168.37.128:61616"; private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws Exception{ // 建立链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 创建链接 Connection connection = connectionFactory.createConnection(); // 启动链接 connection.start(); // 建立会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一个目标 Destination destination = session.createTopic(QUEUE_NAME); // 建立一个消费者 MessageConsumer consumer = session.createConsumer(destination); // 消费消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message arg0) { TextMessage textMessage = (TextMessage)arg0; System.out.println("receive" + textMessage); } }); } }
之因此 会是 200个消息,可是只消费了 100个, 是由于在 消费者订阅以前, 就已经产生了100个消息,因此这100个不会被消费。
9 ActiveMQ集群配置
为何要对消息中间件集群?
实现高可用,以排除单点故障引发的服务中断
实现负载均衡,以提高效率为更多的客户提供服务
集群方式:
客户端集群:让多个消费者消费同一队列
Broker clusters:多个Broker之间同步消息
Master Slave:实现高可用
客户端配置
ActiveMQ 失效转移(failover)
容许当其中一台消息服务器宕机时,客户端在传输层上从新链接到其余消息服务器。
语法:failover:(url1,...,urln)?transportOptions
transportOptions 说明:
randomize 默认为true, 表示在URL列表中选择URL连接时 是否采用随机策略
initialReconnectDelay 默认为10,单位毫秒,表示第一次尝试重连之间等待时间
maxReconnectDelay 默认30000, 单位毫秒,最长重连的时间间隔
Broker Cluster 集群配置
原理:
节点A的消息 能够同步到 节点B, 节点B的消息也能够同步到节点A上, 节点A产生的消息能够被节点B的消费者消费, 节点B产生的消息能够被节点A的消费者消费。
Master/Slave集群配置
集群方案
Share nothing storage master/slave (已过期,5.8+后移除)
Shared storage master/slave 共享存储
Replicated LevelDB Store 基于复制的LevelDB Store === > 基于zookeeper 的 master 选择方案
共享存储集群的原理
当前 节点A得到锁资源,成为master , 节点B就没法得到锁资源,一直在等待。若是某个时刻,节点A挂掉,那么节点B会得到锁资源,成为master, 节点A 成为slave
下面是 基于复制的LevelDB Store 的原理
基于zk, 此时节点A被zk选举为Master, 此时节点A做为与外界沟通的口子,当A接收到新的信息,则会在本地进行存储,而且经过zk 传输到节点B和节点C上进行本地存储。
两种集群方式对比:
master/slave 方式 只支持高可用,可是不支持负载均衡。
Broker Cluster 支持 负载均衡,可是不支持高可用。
下面来看一种 便可以高可用,又能够负载均衡的方式。
咱们使用三台服务器的完美集群方案:
节点B和节点C实现高可用, 节点A为 节点B 或者C 的负载均衡。 可是此方案,一旦节点B 和 节点C 所有挂掉,那么整个系统也就挂掉了,因此咱们要使用更多太服务器来防止多台服务器宕机的场景。