消息中间件

一、什么是消息中间件?php

      关注于数据的发送和接收,利用高效可靠地异步消息传递机制集成分布式系统。java

二、什么是JMS? java - apilinux

      Java 消息服务(Java Message Service)即JMS, 是一个Java 平台(不能跨语言)中关于面向消息中间件的API,用户两个应用程序之间,或分布式系统中发送消息,进行异步通讯。c++

三、AMQP协议(为了跨语言) Wire-protocaol, 只支持 byte[] 二进制的消息类型c#

   AMQP(advanced message queuing protocol) 是一个提供统一消息服务的应用层标准协议,给予此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样开发语言等条件的限制。api

四、经常使用的消息中间件服务器

      <1> ActiveMQ 彻底支持JMS1.1与J2EE 1.4规范的JMS Provider实现。session

             支持多种语言 Java c c++ c# Ruby Perl Python PHP,支持的应用协议:OpenWire, Stomp REST, WS Notification, Xmpp, AMQP  (没有JMS 由于不是应用协议,只是开发规范)。负载均衡

      <2> RabbitMQ  是一个开源的AMQP实现,服务器端使用Erlang语言编写,用于在分布式系统中存储转发消息,在易用性和扩展性 高可用等方便变现不俗dom

             支持多种客户端, 如 Python Ruby .Net  java jms c php等

      <3> kafka  是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的,分区的,可靠地分布式日志存储服务,它经过一种独一无二的设计提供了一个消息系统的功能。不是一个严格的消息中间件,

     主要是用来作日志储存。即便是很是普通的硬件kafka也能够支持每秒数百万的消息。

     比对:

     

五、消息模式

   主题和队列两种模式。

      队列模型: 客户端包含生产者和消费者

                         队列中的消息只能被一个消费者消费

                         消费者能够随时消费队列中的消息

 

     主题模型:  客户端包括发布者和订阅者

         主题中的消息被全部的订阅者消费

         消费者不能消费订阅以前就发送到主题中的消息

 

六、JMS 编码接口之间的关系

七、队列模式

在linux上安装activemq部分此处省略

安装完后打开activemq平台,访问地址http://192.168.37.128:8161,登陆帐号和密码都是admin

下面咱们来建立一个生产者:

public class AppProducer {

	private static final String URL = "tcp://192.168.37.128:61616";
	private static final String QUEUE_NAME = "queue-test";
	
	public static void main(String[] args) throws Exception{
	   
		// 建立链接工厂
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		// 创建链接
		Connection connection = connectionFactory.createConnection();
		// 启动链接
		connection.start();
		// 建立会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 建立一个目标
		Destination destination = session.createQueue(QUEUE_NAME);
		// 建立一个生产者
		MessageProducer producer = session.createProducer(destination);
		// 发送消息
		for(int i=0; i < 100; i++){
			// 建立消息
			TextMessage textMessage = session.createTextMessage("test" + i);
			// 发送消息
			producer.send(textMessage);
            System.out.println("send" + textMessage.getText());					
		}
		// 关闭链接
		connection.close();
	
	}
}

 运行成功后,咱们查看activemq平台,发现队列中有100个待消费的消息:

ok, 下面咱们来建立一个消费者:

public class AppConsumer {

	private static final String URL = "tcp://192.168.37.128:61616";
	private static final String QUEUE_NAME = "queue-test";
	
	public static void main(String[] args) throws Exception{
	   
		// 建立链接工厂
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		// 创建链接
		Connection connection = connectionFactory.createConnection();
		// 启动链接
		connection.start();
		// 建立会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 建立一个目标
		Destination destination = session.createQueue(QUEUE_NAME);
		// 建立一个消费者
		MessageConsumer consumer = session.createConsumer(destination);
		// 消费消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message arg0) {
				TextMessage textMessage = (TextMessage)arg0;
				System.out.println("receive" + textMessage);
				
			}
		});
	
	}
}

运行成功后,咱们查看activemq平台,发现队列中100个待消费的消息所有被消费:

8 主题模式  主题模式下,消费者必需要比订阅者先启动,不然不会收到消息。

   主题模式和队列模式,代码机会相同,只须要在建立目标的时候改为建立主题。

public class AppTopicProducer {

	private static final String URL = "tcp://192.168.37.128:61616";
	private static final String QUEUE_NAME = "queue-test";
	
	public static void main(String[] args) throws Exception{
	   
		// 建立链接工厂
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		// 创建链接
		Connection connection = connectionFactory.createConnection();
		// 启动链接
		connection.start();
		// 建立会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 建立一个目标
		Destination destination = session.createTopic(QUEUE_NAME);
		// 建立一个生产者
		MessageProducer producer = session.createProducer(destination);
		// 发送消息
		for(int i=0; i < 100; i++){
			// 建立消息
			TextMessage textMessage = session.createTextMessage("test" + i);
			// 发送消息
			producer.send(textMessage);
            System.out.println("send" + textMessage.getText());					
		}
		// 关闭链接
		connection.close();
	
	}
}

  

public class AppTopicConsumer {

	private static final String URL = "tcp://192.168.37.128:61616";
	private static final String QUEUE_NAME = "queue-test";
	
	public static void main(String[] args) throws Exception{
	   
		// 建立链接工厂
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
		// 创建链接
		Connection connection = connectionFactory.createConnection();
		// 启动链接
		connection.start();
		// 建立会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 建立一个目标
		Destination destination = session.createTopic(QUEUE_NAME);
		// 建立一个消费者
		MessageConsumer consumer = session.createConsumer(destination);
		// 消费消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message arg0) {
				TextMessage textMessage = (TextMessage)arg0;
				System.out.println("receive" + textMessage);
				
			}
		});
	
	}
}

之因此 会是 200个消息,可是只消费了 100个, 是由于在 消费者订阅以前, 就已经产生了100个消息,因此这100个不会被消费。

 9  ActiveMQ集群配置

为何要对消息中间件集群?

  实现高可用,以排除单点故障引发的服务中断

  实现负载均衡,以提高效率为更多的客户提供服务

集群方式:

       客户端集群:让多个消费者消费同一队列

    Broker clusters:多个Broker之间同步消息

       Master Slave:实现高可用

客户端配置

      ActiveMQ 失效转移(failover)

      容许当其中一台消息服务器宕机时,客户端在传输层上从新链接到其余消息服务器。

      语法:failover:(url1,...,urln)?transportOptions

      transportOptions 说明:

            randomize 默认为true, 表示在URL列表中选择URL连接时 是否采用随机策略

            initialReconnectDelay 默认为10,单位毫秒,表示第一次尝试重连之间等待时间

            maxReconnectDelay 默认30000, 单位毫秒,最长重连的时间间隔

Broker Cluster 集群配置

           原理: 

           节点A的消息 能够同步到 节点B, 节点B的消息也能够同步到节点A上, 节点A产生的消息能够被节点B的消费者消费, 节点B产生的消息能够被节点A的消费者消费。

Master/Slave集群配置

集群方案

      Share nothing storage master/slave (已过期,5.8+后移除)

      Shared storage master/slave 共享存储

           Replicated LevelDB Store 基于复制的LevelDB Store   === > 基于zookeeper 的 master 选择方案

           共享存储集群的原理

            

            当前 节点A得到锁资源,成为master , 节点B就没法得到锁资源,一直在等待。若是某个时刻,节点A挂掉,那么节点B会得到锁资源,成为master, 节点A 成为slave

                 

            下面是 基于复制的LevelDB Store 的原理

            

            基于zk, 此时节点A被zk选举为Master,  此时节点A做为与外界沟通的口子,当A接收到新的信息,则会在本地进行存储,而且经过zk 传输到节点B和节点C上进行本地存储。

两种集群方式对比:

           master/slave 方式 只支持高可用,可是不支持负载均衡。  

           Broker Cluster 支持 负载均衡,可是不支持高可用。

下面来看一种 便可以高可用,又能够负载均衡的方式。

      咱们使用三台服务器的完美集群方案:

节点B和节点C实现高可用, 节点A为 节点B 或者C 的负载均衡。 可是此方案,一旦节点B 和 节点C 所有挂掉,那么整个系统也就挂掉了,因此咱们要使用更多太服务器来防止多台服务器宕机的场景。

相关文章
相关标签/搜索