在我开始构思这几篇关于“本身动手设计ESB中间件”的文章时,曾有好几回动过放弃的念头。缘由倒不是由于对冗长的文章产生了惰性,而是ESB中所涉及到的技术知识和须要突破的设计难点实在是比较多,再冗长的几篇博文甚至没法对它们所有进行概述,另外若是在思路上稍微有一点差池就会误导读者。一个能够稳定使用的ESB中间件凝聚了一个团队不少参与者的心血,一我的确定是没法完成这些工做的。可是笔者思索再三,仍是下决心将这这即使文章完成,由于这是对本专题从第19篇文章到第39篇文章中所介绍的知识点的最好的总结。咱们本身动手设计ESB中间件,不是为了让它商用,也不是为了让它能够比拟市面上某款ESB中间件,甚至不是为了把ESB中的技术难点的解决所有方案化。咱们的目的是检验整个专题中所介绍的知识点是否能在读者本身消化后进行综合应用,是否能作到技术知识的活学活用、按需选型。java
(顶层设计图)web
上图是咱们要进行实现的ESB中间件的顶层设计。从上图中能够看到,整个ESB中间件分为如下几个模块:Client客户端、流程编排/注册工具、主控服务模块、服务状态协调组(模块)、服务运行组(模块)。首先咱们大体描述一下这些模块的工做内容:算法
Client客户端是须要接入ESB中间件的各个业务服务系统。例如物流系统、联帐系统、CRM系统等等。在这些客户端系统接入ESB中间件时,将集成ESB中间件提供给他们的各类开发语言版本的ESB-Client组件。若是使用的是C#语言则ESB-Client组件可能以DLL文件的方式提供;若是使用的是JAVA语言则ESB-Client组件可能以Jar文件的方式提供;若是使用的是NODEJS则多是一个(或多个)JS文件……数据库
这些客户端系统的开发人员将可使用ESB中间件提供的一个独立的流程编排/注册工具,后者在不少ESB中间件系统中通常被命名为“…… Studio”,而且这些流程编排/注册工具通常以各类IDE插件的形式提供出来,例如制做成Eclipse-Plugin提供给开发人员。这些工具的主要做用就是让客户端系统的开发人员(开发团队)具有向ESB主控服务进行原子服务注册的能力,另外还可让开发人员查询到目前服务端全部可用的其它原子服务(来自于其它业务系统的),以便在流程编排/注册工具上完成新的服务流程编排和已有服务流程新版本的发布。这就是上图中标注为“1”的步骤。json
另外ESB中间件为了保证流程编排所使用的原子服务不会由于提供这个原子服务的业务系统的变化而产生影响,通常来讲在进行业务系统注册原子服务时都会指定这个原子服务的版本和调用权限。调用权限通常又分为黑名单权限和白名单权限。以白名单权限来讲,只有白名单中所列列举的业务系统有权限调用这个原子服务。即便这个原子服务参与了某个ESB中的流程编排,若是请求这个编排好的流程的业务系统不在这个白名单中,调用也会失败。缓存
主控服务为流程编排/发布工具提供新的原子服务注册请求、新的流程发布请求、已有流程的新版本发布请求。最新的原子服务、流程编排等数据将会被主控服务存储在持久化容器中(例如关系型数据库),而且向“服务状态协调模块”发送最新的数据变化。注意,主控服务并不负责执行编排好的流程,只是用于记录数据编排的变化和向“数据协调模块”发送这些数据变化,这就是上图中所标示的步骤2。主控服务还有另外两个做用:负责权限管理和服务运行模块的状态监控。bash
因为负责最终对流程编排进行执行的“服务运行模块”存在不少节点(下文称为ESB-Broker Server节点),且这些ESB-Broker Server节点的数量在服务过程当中会不断变化(新增或减小),因此“主控服务”并不知道有哪些Boker在运行。为了通知这些在运行状态的Broker有新的服务编排被发布(或者其它事件信息),这些处于运行状态的ESB-Broker Server节点都会链接到“服务状态协调模块”,而且由后者完成数据变化的事件通知。这就是“服务状态协调模块”的主要功能,也是上图中所示的步骤3。在咱们本身设计的ESB中间件中,“服务状态协调模块”由一组zookeeper服务构成(在我另外几篇博文中专门介绍zookeeper,这个专题就不对zookeeper的基本操做进行讲解了),若是您在实际的工做中有其它功能/技术需求,也能够本身设计“服务状态协调模块”。网络
在业务系统集成过程当中,ESB中间件所扮演的角色就是在各个业务系统间进行原子服务调用、转换数据、再进行原则服务调用、再转换…….最后向执行服务编排的请求者返回结果。因此ESB中间件服务每每有较高的性能要求。若是执行ESB服务编排的节点只有一个,每每就达不到ESB中间件的设计要求甚至会使ESB中间件服务成为整个软件架构的性能瓶颈点。因此在咱们设计的ESB中间件中,真正执行ESB服务的节点会有多个这种ESB-Broker Server节点。数据结构
在ESB运行服务的过程当中使用多个Broker Server有不少好处,首先来讲它们能够保证在整个系统出现请求洪峰的状况下,可以把这些请求压力平均分配到这些Broker Server节点上,最终使ESB服务不会成为整个顶层设计的瓶颈。请求压力的分配工做会由zookeeper集群完成。另外,多个Broker Server能够保证某一个(或者几个)Broker Server节点在出现异常并退出服务后,整个ESB中间件的服务不会中止——这是一个现成的容错方案。开发人员能够经过退避算法来决定ESB-Client下一次试图访问出现错误的Broker Server节点的时间,也能够当即为ESB-client从新分配一个健康的Broker Server节点。最后,这个解决方案能够在ESB服务运行的过程当中保证明现Broker Server的动态横向扩展:当ESB主控服务模块发现整个Broker Server服务组的性能达到(或快要达到)峰值时,运维人员能够立刻开启新的Broker Server节点,zookeeper集群会负责将定制的编排、定制的Processor处理器等数据信息动态加载到新的Broker Server节点中,并让后者当即加入整个服务组开始工做。架构
在ESB-Client(某个业务系统)请求执行某个服务编排时,首先会使用这个ESB-Client(某个业务系统)已经集成的zookeeper客户端请求ESB的zookeeper集群服务,从中取得当前正在运行的Broker Server节点信息,并经过某种算法决定本身访问哪个Broker Server节点(算法不少:轮询算法、加权算法、一致性Hash算法等等),如“顶层设计图”中步骤四、步骤5所示。为了保证上文中提到的新的Broker Server节点可以加入服务组并为ClientESB-Client服务,步骤4和步骤5的过程能够周期性进行,并视状况从新为ESB-Client分配Broker Server节点。
当ESB-Client肯定目标Broker Server节点后,将正式向这个Broker Server发起执行某个服务编排的请求。当同一个ESB-Client第二次请求执行服务编排时,就能够在必定时间周期内(有效时间内)再也不走步骤四、5了,而能够直接发起请求到同一个目标Broker Server节点。直到这个Broker Server再也不可以响应这些请求为止(或者有其它依据肯定这个Broker Server节点已经不能提供服务),ESB-Client会再执行步骤四、5,以便肯定另外一个新的、正常工做的Broker Server节点。在下文笔者也会重点介绍如何进行Broker Server节点的选择。
上一节已经说到,在咱们设计的ESB中间件中包括两个模块:主控服务模块和服务运行组(模块)。其中主控服务模块的其中一个做用,是对若干当前处于运行状态的服务运行组节点(Broker Server)进行性能状态监控。性能状态监控的目的是确保运维人员实时了解这些Broker Server的运行状态,而且能在整个服务运行组快要达到性能瓶颈时可以启动新的Broker Server分担压力或者在整个服务运行组没有什么请求负担时,中止一些Broker Server。
那么主控节点如何知道整个Broker Server组中多个服务节点的性能状态呢?要知道,Broker Server节点是能够动态扩展的。上文也已经说到:主控节点并不知道当前有哪些Broker Server节点处于运行状态。那么基于Kafka消息队列的日志收集就是一个解决方案,设计人员还可使用Flume + Storm的解决方案进行日志自动收集和即时分析。下面咱们对这两种日志收集方案进行介绍。注意,关于Kafka、Flume在这个专题以前的文章中已经作了详细介绍,因此本小节中涉及Kafka、Flume技术的部分就再也不对设计方案的实施进行介绍了。
Kafka Server的特色就是快,虽然在特定的状况下Kafka Server会出现消息丢失或重复发送的问题,可是这个问题针对日志数据收集场景来讲不算大问题。使用消息队列收集各Broker Server节点的性能日志也是和ESB中各模块的依赖特性相适应的:因为在咱们设计的ESB中间件中,主控服务模块并不知道有多少Broker Server节点处于运行状态,也不知道这些Broker Server节点的IP位置。也就是说主控服务模块没法主动去这些Broker Server节点上收集性能数据。最好的办法就是由这些活动的Broker Server节点主动发送日志数据。
下图是使用Kafka组件收集Broker Server节点上性能数据并进行性能数据处理、结果存储的设计示例图:
上图中,每个Broker Server节点上除了启动一个Camel Context实例之外(后文进行详细说明),还须要配置一个Kafka的Producer端用于发送数据。Kafka-Producer端收集的性能数据可能包括:CPU使用状况、内存使用状况、本地I/O速度、操做系统线程状况、Camel Context中的路由实例状态、Endpoint在Cache中的命中状况、客户端对Broker Server中以编排路由的调用状况等等——业务数据和非业务数据均可以经过这种方式进行监控,而且以业务数据为主。
Kafka Servers中部署了三个Kafka Broker Server节点(建议的值),用于接收若干ESB Broker Server节点上各个Kafka-Producer发送来的性能日志数据。为了保证整个Kafka集群的性能,每个Kafka Broker Server都有至少两个分区(partition,仍是建议值)。这里多说一句,为了节约服务资源您能够将Kafka Broker Server和Kafka-Consumer放在一台服务节点上,甚至能够将它们和主控服务节点放在一块儿。
Kafka-Consumer负责进行性能日志数据的处理。有的读者可能就要问了,既然Consumer接收到的都是能够独立存储性能日志数据,那么只须要将这些日志找到一个合适的存储方案(例如HBase)存放起来就能够了,还须要Consumer作什么处理呢?这是由于开发团队完成的Producer采样频率可能和运维团队要求的监控采用频率不同。
为了保证性能监控数据的精准性,开发团队利用基于Kafka集群提供的吞吐量优点,能够在各个ESB Broker Server节点所集成的Kafka-Producer上设置一个较高的采样率(固然仍是要顾忌节点自己的资源消耗),例如每秒对固定的业务指标和非业务指标完成10次采样。可是运维团队经过主控服务监控各个ESB Broker Server节点是,每每不须要这么高的采样率(这里能够提供一个设置选项供运维团队随时进行调整),大概也就是每秒更新1次的样子就差很少了。
那么Consumer如何处理每秒钟多出来的9次采样数据呢?能够明确想到的有两种处理方式:一种处理方式是不管主控服务的监控台上的性能指标以何种频率进行显示,Consumer都将收到的数据写出存储系统中;另外一种处理方式是Consumer将收到的多余数据丢弃,只按照运维团队设置的采样频率将数据写入持久化存储系统。在第二中处理方式中有一个状况须要特别注意:若是将要被丢弃的性能数据达到了性能阀值(例如本次采集的内存使用率超多了2GB),则这条日志数据仍是须要进行保留。第一种处理基本上没有什么须要介绍的,优势和缺点也是很明确的:优势是能够在后期进行完整的性能历史回溯,缺点就是会占用较大的存储空间——虽然目前可使用的超大存储方案有不少并且都很成熟稳定,但它们都须要比较强大的资金预算支持。
这里笔者主要讨论一下Consumer的第二种处理方式:丢弃多余的数据。咱们可使用以前文章介绍过的ConcurrentLinkedHashMap做为Consumer中存储性能消息日志的Cache,Cache的固定大小设置为200(或者其它一个较大的值)。这个Cache结构能够帮助咱们完成不少工做:首先它可靠的性能可以保证过个Consumer不会成为整个性能日志收集方案的瓶颈——虽然ConcurrentLinkedHashMap的性能并非最快的;其次这个Cache结构可以帮助咱们自动完成多余性能日志的清除工做,由于在第201条日志记录被推入Cache时,在LRU队列尾部的最初一条记录将自动被排除队列,最终被垃圾回收策略回收掉;最后,Consumer按照运维团队设置的采样周期,对Cache中的性能日志数据进行持久化保存时,始终只须要取出当前在Cache将被剔除的那条记录,这样就省掉了编写程序,在两个周期的时间差之间判断“要对哪条性能日志数据”进行持久化保存的定位工做。
顺便说一句,若是您须要在工程中使用Google提供的ConcurrentLinkedHashMap数据结构工具,那么您须要首先在pom文件中添加相应的组件依赖信息:
<dependency>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
<version>1.4.2</version>
</dependency>
如下是Consumer中用于处理LRU队列添加、LRU周期性读取、LRU删除事件的代码片断:
......
/** * 这就是性能数据的LRU队列 */
private static final ConcurrentLinkedHashMap<Long, String> PERFORMANCE_CACHE =
new ConcurrentLinkedHashMap.Builder<Long, String>()
.initialCapacity(200)
.maximumWeightedCapacity(200)
.listener(new EvictionListenerImpl())
.build();
......
/** * 这个监听器用于在数据被从LRU队列剔除时<br> * 按照功能需求检查这条记录是否须要被持久化存储起来。 * @author yinwenjie */
public static class EvictionListenerImpl implements EvictionListener<Long, String> {
// 上一次进行数据采集的时间,初始为-1
private Long lastTime = -1l;
// 这是由运维团队设置的数据采集周期,1000表示1000毫秒
// 正式系统中,这个值将有外部读取
private Long period = 1000l;
@Override
public void onEviction(Long key, String jsonValue) {
/* * 如下条件任意成立时,就须要对这条数据进行采集了: * 一、lastTime为-1的状况(说明是程序第一次采集) * * 二、当前事件 - lastTime >= period(采集周期) * * 三、当监控数据大于设置的警告阀值,在这个示例代码中 * 这个警告阀值为80,正式系统中,这个阀值应从外部读取 * 如下的threshold变量就表明这个值 * */
Long threshold = 80L;
Long nowtime = new Date().getTime();
// 获取性能数据中的CPU使用率
// 注意,正式系统 中最好不要传递json结构,文本结构的数据就行了
JSONObject jsonData = JSONObject.fromObject(jsonValue);
Long cpuRate = jsonData.getLong("cpu");
boolean mustCollecting = false;
if(this.lastTime == -1 ||
nowtime - lastTime >= this.period ||
cpuRate >= threshold) {
mustCollecting = true;
this.lastTime = nowtime;
}
// 若是不须要作数据的持久化存储,就终止本次监听的操做便可
if(!mustCollecting) {
return;
}
// ********************
// 这里能够作持久化数据存储的操做了
// ********************
LRUConsumer.LOGGER.info(key + ":" + jsonValue + " 完成数据持久存储操做=======");
}
}
......
// 如下代码就是当Kafka-Consumer收到性能日志数据的操做
// 将这个数据存放到PERFORMANCE_CACHE便可
Long key = new Date().getTime();
// 可以使用时间的毫秒数做为key值(正式应用场景下,考虑多个consumer节点,Key的肯定会有一个更规范的规则)
LRUConsumer.PERFORMANCE_CACHE.put(key, performanceData);
......
以上代码中,咱们使用以前已经介绍过的LRU数据结构在Consumer端保存发送过来的数据。若是读者对LRU还不清楚能够查看我另外的一篇文章中的介绍(《架构设计:系统间通讯(39)——Apache Camel快速入门(下2)》)。由Google提供的ConcurrentLinkedHashMap结构就能够向咱们提供一个现成的LRU队列,这样一来当LRU队列存储满后,最早被接收到的性能日志数据就会从队列尾部被删除。最关键的处理工做都将在EvictionListener接口的实现类中完成,在实际应用中开发人员还能够在肯定一条性能日志须要被持久化存储以后专门启动一个线程进行操做,例如使用一个专门的线程池(ThreadPoolExecutor)。这样一来LRU队列就真正不受持久化存储操做延迟时间的影响了。
以上使用Kafka收集Broker Server节点的性能数据的方案中,须要在每一个编写的Broker Server节点上增长额外的代码向Kafka Broker Server发送数据。实际上这种功能需求状况使用Apache Flume收集数据会使技术方案更容易实现和维护,下面咱们就大体介绍一下这个技术方案实现。因为在以前的文章中笔者已经较详细的介绍了如何使用Apache Flume进行基本配置了,因此这里咱们重点讨论两个问题Apache Flume的数据来源和Storm Server在接收到Flume Server发送来的数据后如何进行处理。
上图展现了整个功能需求的设计结构。安装在ESB Broker Server节点的Flume程序负责收集这个节点上的各类功能性指标和非功能性指标,这样避免了在ESC Broker Server服务上编写额外的代码采集非功能性指标,也减小了编写代码采集功能性指标的复杂度。而后将这些性能日志数据按照负载均衡模式传递到若干中继Flume Server节点上,后者专门用于承载/汇总多个ESB Broker Server节点传来的性能日志数据,而且最终将数据写入Storm Server。在Flume Server和Storm Server之间咱们仍是须要使用Kafka Server做为缓存,这是由于Apache Kafka经过Storm-Kafka组件和Storm Server实现无缝集成。
首先请注意安装在ESB Broker Server节点的Flume程序,在3-1小节中采集节点功能性指标和非功能性指标都是依靠开发人员编写程序完成,并发送给Kafka-Broker。但这样作却真的绕了很大一个弯路,由于Linux操做系统上已经提供了不少采集节点非功能性指标的方式(例如采集I/O信息、内存使用信息、内存分页信息、CPU使用信息、网络流量信息等),开发人员只须要一些脚本就能够完成采集工做。例如,咱们采集CPU信息彻底不须要咱们在ESB-Broker Server中编写程序(采集CPU信息也不该该是ESB-Broker Server的一项工做任务),而采用以下的脚本便可:
top -d 0.1 | grep Cpu >> cpu.rel
#写法还有不少,还能够从/proc/stat文件中获取CPU状态
以上脚本能够按照100毫秒为周期,获取CPU的信息。并将这条信息做为一条新的记录存储到cpu.rel文件中。这样Apache Flume就能够读取cpu.rel文件中的变化,做为性能日志数据的来源:
#flume 配置文件中的片断
......
agent.sources.s1.type = exec
agent.sources.s1.channels = c1
agent.sources.s1.command = tail -f -n 0 /root/cpu.rel
......
在ESB-Broker Server节点中,咱们可使用这样的方式从不一样文件中读取各类不一样的日志信息,以下图所示:
================================= (接下文)