架构设计:系统间通讯(25)——ActiveMQ集群方案(上)

一、综述

经过以前的文章,咱们讨论了ActiveMQ的基本使用,包括单个ActiveMQ服务节点的性能特征,关键调整参数;咱们还介绍了单个ActiveMQ节点上三种不一样的持久化存储方案,并讨论了这三种不一样的持久化存储方案的配置和性能特色。可是这还远远不够,由于在生产环境中为了保证让咱们设计的消息服务方案可以持续工做,咱们还须要为消息中间件服务搭建集群环境,从而在保证消息中间件服务可靠性和处理性能。java

二、ActiveMQ多节点方案

集群方案主要为了解决系统架构中的两个关键问题:高可用和高性能。ActiveMQ服务的高可用性是指,在ActiveMQ服务性能不变、数据不丢失的前提下,确保当系统灾难出现时ActiveMQ可以持续提供消息服务,高可靠性方案最终目的是减小整个ActiveMQ中止服务的时间web

ActiveMQ服务的高性能是指,在保证ActiveMQ服务持续稳定性、数据不丢失的前提下,确保ActiveMQ集群可以在单位时间内吞吐更高数量的消息、确保ActiveMQ集群处理单条消息的时间更短、确保ActiveMQ集群可以容纳更多的客户端稳定链接。apache

下面咱们分别介绍如何经过多个ActiveMQ服务节点集群方式,分别提供热备方案和高性能方案。最后咱们讨论如何将两种方案结合在一块儿,最终造成在生成环境下使用的推荐方案。安全

2-二、ActiveMQ高性能方案

ActiveMQ的多节点集群方案,主要有动态集群和静态集群两种方案。所谓动态集群就是指,同时提供消息服务的ActiveMQ节点数量、位置(IP和端口)是不肯定的,当某一个节点启动后,会经过网络组播的方式向其余节点发送通知(同时接受其余节点的组播信息)。当网络中其余节点收到组播通知后,就会向这个节点发起链接,最终将新的节点加入ActiveMQ集群;所谓静态集群是指同时提供消息服务的多个节点的位置(IP和端口)是肯定的,每一个节点不须要经过广播的方式发现目标节点,只须要在启动时按照给定的位置进行链接。服务器

  • 静态集群方案markdown

    这里写图片描述

  • 动态集群方案网络

这里写图片描述

2-1-一、基于组播(multicast)的节点发现

在使用动态集群配置时,当某个ActiveMQ服务节点启动后并不知道整个网络中还存在哪些其余的服务节点。因此ActiveMQ集群须要规定一种节点与节点间的发现机制,以保证可以解决上述问题。ActiveMQ集群中,使用“组播”原理进行其余节点的发现架构

组播(multicast)基于UDP协议,它是指在一个可连通的网络中,某一个数据报发送源向一组数据报接收目标进行操做的过程。在这个过程当中,数据报发送者只须要向这个组播地址(一个D类IP)发送一个数据报,那么加入这个组播地址的全部接收者均可以收到这个数据报。组播实现了网络中单点到多点的高效数据传送,可以节约大量网络带宽,下降网络负载。负载均衡

这里写图片描述

在IP协议中,规定的D类IP地址为组播地址。224.0.0.0~239.255.255.255这个范围内的IP都是D类IP地址,其中有一些IP段是保留的有特殊含义的:框架

  • 224.0.0.0~224.0.0.255:这个D类IP地址段为保留地址,不建议您在开发过程当中使用,由于可能产生冲突。例如224.0.0.5这个组播地址专供OSPF协议(是一种路由策略协议,用于找到最优路径)使用的组播地址;224.0.0.18这个组播地址专供VRRP协议使用(VRRP协议是虚拟路由器冗余协议)。

  • 224.0.1.0~224.0.1.255:这个D类IP地址为公用组播地址,用于在整个Internet网络上进行组播。除非您有顶级DNS的控制/改写权限,不然不建议在局域网内使用这个组播地址断。

  • 239.0.0.0~239.255.255.255:这个D类IP地址段为推荐在局域网内使用的组播地址段。注意,若是要在局域网内使用组播功能,须要局域网中的交换机/路由器支持组播功能。幸运的是,目前市面上只要不是太太低端的交换机/路由器,都支持组播功能(组播功能所使用的主要协议为IGMP协议,关于IGMP协议的细节就再也不进行深刻了)。

下面咱们使用java语言,编写一个局域网内的组播发送和接受过程。以便让各位读者对基于组播的节点发现操做有一个直观的理解。虽然ActiveMQ中关于节点发现的过程,要比如下的示例复杂得多,可是基本原理是不会改变的。

  • 组播数据报发送者:
package multicast;

import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.Date;

public class SendMulticast {
    public static void main(String[] args) throws Throwable {
        // 组播地址
        InetAddress group = InetAddress.getByName("239.0.0.5");
        // 组播端口,同时也是UDP 数据报的发送端口
        int port = 19999;
        MulticastSocket mss = null;  

        // 建立一个用于发送/接收的MulticastSocket组播套接字对象
        mss = new MulticastSocket(port);
        // 建立要发送的组播信息和UDP数据报
        // 携带的数据内容,就是这个activeMQ服务节点用来提供Network Connectors的TCP/IP地址和端口等信息
        String message = "我是一个活动的activeMQ服务节点(节点编号:yyyyyyy),个人可用tcp信息为:XXXXXXXXXX : ";  
        byte[] buffer2 = message.getBytes(); 
        DatagramPacket dp = new DatagramPacket(buffer2, buffer2.length, group, port);
        // 使用组播套接字joinGroup(),将其加入到一个组播
        mss.joinGroup(group);

        // 开始按照必定的周期向加入到224.0.0.5组播地址的其余ActiveMQ服务节点进行广播
        Thread thread = Thread.currentThread();
        while (!thread.isInterrupted()) {
            // 使用组播套接字的send()方法,将组播数据包对象放入其中,发送组播数据包
            mss.send(dp);
            System.out.println(new Date() + "发起组播:" + message);
            synchronized (SendMulticast.class) {
                SendMulticast.class.wait(5000);
            }
        }

        mss.close();
    }
}
  • 组播数据报接收者:
package multicast;

import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;

/** * 测试接收组播信息 * @author yinwenjie */
public class AcceptMulticast {
    public static void main(String[] args) throws Throwable {
        // 创建组播套接字,并加入分组
        MulticastSocket multicastSocket = new MulticastSocket(19999);
        // 注意,组播地址和端口必须和发送者的一直,才能加入正确的组
        InetAddress ad = InetAddress.getByName("239.0.0.5");
        multicastSocket.joinGroup(ad);

        // 准备接收可能的组播信号
        byte[] datas = new byte[2048];
        DatagramPacket data = new DatagramPacket(datas, 2048 ,ad , 19999);
        Thread thread = Thread.currentThread();

        // 开始接收组播信息,并打印出来
        System.out.println(".....开始接收组播信息.....");
        while(!thread.isInterrupted()) {
            multicastSocket.receive(data);
            int leng = data.getLength();
            System.out.println(new String(data.getData() , 0 , leng , "UTF-8"));
        }

        multicastSocket.close();
    }
}

另外,咱们以前讲过的DUBBO框架中,也有基于“组播”的发现/注册管理。具体可参考DUBBO框架中的com.alibaba.dubbo.registry.multicast.MulticastRegistry类和其引用类(如下为MulticastRegistry类中,建立组播套接字和接受组播数据报的关键代码段):

......
mutilcastAddress = InetAddress.getByName(url.getHost());
mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
mutilcastSocket = new MulticastSocket(mutilcastPort);
mutilcastSocket.setLoopbackMode(false);
mutilcastSocket.joinGroup(mutilcastAddress);
Thread thread = new Thread(new Runnable() {
    public void run() {
        byte[] buf = new byte[2048];
        DatagramPacket recv = new DatagramPacket(buf, buf.length);
        while (! mutilcastSocket.isClosed()) {
            try {
                mutilcastSocket.receive(recv);
                String msg = new String(recv.getData()).trim();
                int i = msg.indexOf('\n');
                if (i > 0) {
                    msg = msg.substring(0, i).trim();
                }
                MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
                Arrays.fill(buf, (byte)0);
            } catch (Throwable e) {
                if (! mutilcastSocket.isClosed()) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }
}, "DubboMulticastRegistryReceiver");
thread.setDaemon(true);
thread.start();
......

2-1-二、桥接Network Bridges

为了实现ActiveMQ集群的横向扩展要求和高稳定性要求,ActiveMQ集群提供了Network Bridges功能。经过Network Bridges功能,技术人员能够将多个ActiveMQ服务节点链接起来。并让它们经过配置好的策略做为一个总体对外提供服务。

这样的服务策略主要包括两种:主/从模式和负载均衡模式。对于第一种策略咱们会在后文进行讨论。本节咱们要重点讨论的是基于Network Bridges的负载均衡模式。

这里写图片描述

2-1-三、动态Network Connectors

既然已经讲述了ActiveMQ中的动态节点发现原理和ActiveMQ Network Bridges的概念,那么关于ActiveMQ怎样配置集群的方式就是很是简单的问题了。咱们先来讨论如何进行基于组播发现的ActiveMQ负载均衡模式的配置——动态网络链接Network Connectors;再来讨论基于固定地址的负载均衡模式配置——静态网络链接Network Connectors。

要配置基于组播发现的ActiveMQ负载均衡模式,其过程很是简单。开发人员只须要在每个ActiveMQ服务节点的主配置文件中(activemq.xml),添加/更改 如下配置信息便可:

......
<transportConnectors>
    <!-- 在transportConnector中增长discoveryUri属性,表示这个transportConnector是要经过组播告知其它节点的:使用这个transportConnector位置链接我 -->
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50&amp;consumer.prefetchSize=5" discoveryUri="multicast://239.0.0.5" />
</transportConnectors>

......

<!-- 关键的networkConnector标签, uri属性标示为组播发现-->
<networkConnectors>
    <networkConnector uri="multicast://239.0.0.5" duplex="false"/>
</networkConnectors>

......

2-1-3-1:networkConnector标签

若是使用ActiveMQ的组播发现功能,请在networkConnector标签的uri属性中添加以下格式的信息:

multicast://[组播地址][:端口]

例如,您能够按照以下方式使用ActiveMQ默认的组播地址来发现网络种其余ActiveMQ服务节点:

#ActiveMQ集群默认的组播地址(239.255.2.3):
multicast://default

也能够按照以下方式,指定一个组播地址——这在高安全级别的网络中颇有用,由于可能其余的组播地址已经被管理员禁用。注意组播地址只能是D类IP地址段:

#使用组播地址239.0.0.5
multicast://239.0.0.5

如下是经过抓包软件得到的的组播UDP报文:

这里写图片描述

从上图中咱们能够得到几个关键信息:

  • 192.168.61.138和192.168.61.139这两个IP地址分别按照必定的周期(1秒一次),向组播地址239.0.0.5发送UDP数据报。以便让在这个组播地址的其它服务节点可以感知本身的存在

  • 另外,以上UDP数据报文使用的端口是6155。您也能够更改这个端口信息经过相似以下的方式:

#使用组播地址239.0.0.5:19999
multicast://239.0.0.5:19999
  • 每一个UDP数据报中,包含的主要信息包括本节点ActiveMQ的版本信息,以及链接到本身所须要使用的host名字、协议名和端口信息。相似以下:
default.ActiveMQ-4.ailve%localhost%auto+nio://activemq:61616

2-1-3-2:transportConnector标签的关联设置

任何一个ActiveMQ服务节点A,要链接到另外的ActiveMQ服务节点,都须要使用当前节点A已经公布的transportConnector链接端口,例如如下配置中,可以供其它服务节点进行链接的就只有两个transportConnector链接中的任意一个:

......
<transportConnectors>
    <!-- 其它ActiveMQ服务节点,只能使用如下三个链接协议和端口进行链接 -->
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="tcp" uri="tcp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?maximumConnections=1000" />
    <transportConnector name="auto" uri="auto://0.0.0.0:61617?maximumConnections=1000" />   
</transportConnectors>
......

那么要将哪个链接方式经过UDP数据报向其余ActiveMQ节点进行公布,就须要在transportConnector标签上使用discoveryUri属性进行标识,以下所示:

......
<transportConnectors>
    ......
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" discoveryUri="multicast://239.0.0.5" />
</transportConnectors>

......
<networkConnectors>
    <networkConnector uri="multicast://239.0.0.5"/>
</networkConnectors>
......

2-1-3-3:其余注意事项

  • 关于防火墙:请记得关闭您Linux服务器上对须要公布的IP和端口的限制;

  • 关于hosts路由信息:因为基于组播的动态发现机制,可以找到的是目标ActiveMQ服务节点的机器名,而不是直接找到的IP。因此请设置当前服务节点的hosts文件,以便当前ActiveMQ节点可以经过hosts文件中的IP路由关系,获得机器名与IP的映射:

    # hosts文件
    
    ...... 192.168.61.139 activemq1 192.168.61.138 activemq2 ......
  • 关于哪些协议可以被用于进行Network Bridges链接:根据笔者以往的使用经验,只有tcp头的uri格式(openwire协议)可以被用于Network Bridges链接;固然您可使用auto头,由于其兼容openwire协议;另外,您还能够指定为附加nio头。

2-1-四、静态Network Connectors

相比于基于组播发现方式的动态Network Connectors而言,虽然静态Network Connectors没有那样灵活的横向扩展性,可是却能够适用于网络环境受严格管理的状况。例如:管理员关闭了交换机/路由器的组播功能、端口受到严格管控等等。

配置静态Network Connectors的ActiveMQ集群的方式也很简单,只须要更改networkConnectors标签中的配置便可,而无需关联改动transportConnectors标签。可是配置静态Network Connectors的ActiveMQ集群时,须要注意很是关键的细节:每个节点都要配置其余全部节点的链接位置

为了演示配置过程,咱们假设ActiveMQ集群由两个节点构成,分别是activemq1:192.168.61.138 和 activemq2:192.168.61.139。那么配置状况以下所示:

  • 192.168.61.138:须要配置activemq2的位置信息以便进行链接:
......
<transportConnectors>
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;consumer.prefetchSize=5"/>
</transportConnectors>
......

<!-- 请注意,必定须要192.168.61.139(activemq2)提供了这样的链接协议和端口 -->
<networkConnectors>
    <networkConnector uri="static:(auto+nio://192.168.61.139:61616)"/>
</networkConnectors>
......
  • 192.168.61.139:须要配置activemq1的位置信息以便进行链接:
......
<transportConnectors>
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;consumer.prefetchSize=5"/>
</transportConnectors>

......
<!-- 请注意,必定须要192.168.61.138(activemq1)提供了这样的链接协议和端口 -->
<networkConnectors>
   <networkConnector uri="static:(auto+nio://192.168.61.138:61616)"/>
</networkConnectors>
......

同理,若是您的ActiveMQ集群规划中有三个ActiveMQ服务节点,那么任何一个节点都应该配置其它两个服务节点的链接方式。在配置格式中使用“,”符号进行分割:

...... <networkConnectors> <networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> </networkConnectors> ......

如下是配置完成后可能的效果:

  • 192.168.61.138(activemq1):

这里写图片描述

  • 192.168.61.139(activemq2):

这里写图片描述

2-1-五、其余配置属性

下表列举了在networkConnector标签中还可使用的属性以及其意义。请特别注意其中的duplex属性。若是只从字面意义理解该属性,则被称为“双工模式”;若是该属性为true,当这个节点使用Network Bridge链接到其它目标节点后,将强制目标也创建Network Bridge进行反向链接。其目的在于让消息既能发送到目标节点,又能够经过目标节点接受消息,但实际上大多数状况下是没有必要的,由于目标节点通常都会自行创建链接到本节点。因此,该duplex属性的默认值为false。

属性名称 默认值 属性意义
name bridge 名称
dynamicOnly false 若是为true, 持久订阅被激活时才建立对应的网路持久订阅。
decreaseNetworkConsumerPriority false 若是为true,网络的消费者优先级下降为-5。若是为false,则默认跟本地消费者同样为0.
excludedDestinations empty 不经过网络转发的destination
dynamicallyIncludedDestinations empty 经过网络转发的destinations,注意空列表表明全部的都转发。
staticallyIncludedDestinations empty 匹配的都将经过网络转发-即便没有对应的消费者,若是为默认的“empty”,那么说明全部都要被转发
duplex false 已经进行详细介绍的“双工”属性。
prefetchSize 1000 设置网络消费者的prefetch size参数。若是设置成0,那么就像以前文章介绍过的那样:消费者会本身轮询消息。显然这是不被容许的。
suppressDuplicateQueueSubscriptions false 若是为true, 重复的订阅关系一产生即被阻止(V5.3+ 的版本中可使用)。
bridgeTempDestinations true 是否广播advisory messages来建立临时destination。
alwaysSyncSend false 若是为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker(V5.6+ 的版本中可使用)。
staticBridge false 若是为true,只有staticallyIncludedDestinations中配置的destination能够被处理(V5.6+ 的版本中可使用)。

如下这些属性,只能在静态Network Connectors模式下使用

属性名称 默认值 属性意义
initialReconnectDelay 1000 重连以前等待的时间(ms) (若是useExponentialBackOff为false)
useExponentialBackOff true 若是该属性为true,那么在每次重连失败到下次重连以前,都会增大等待时间
maxReconnectDelay 30000 重连以前等待的最大时间(ms)
backOffMultiplier 2 增大等待时间的系数

请注意这些属性,并非networkConnector标签的属性,而是在uri属性中进行设置的,例如:

uri="static:(tcp://host1:61616,tcp://host2:61616)?maxReconnectDelay=5000&useExponentialBackOff=false"

============ (接下文)

相关文章
相关标签/搜索