Ambari架构源码解析前端
Ambari是hadoop分布式集群配置管理工具,是由hortonworks主导的开源项目。它已经成为apache基金会的孵化器项目,已经成为hadoop运维系统中的得力助手。node
Ambari充分利用了一些已有的优秀开源软件,巧妙地把它们结合起来,使其在分布式环境中作到了集群式服务管理能力、监控能力、展现能力,这些优秀的开源软件有: python
(1)agent端,采用了puppet管理节点;ios
(2)在web端,采用ember.js做为前端MVC框架和NodeJS相关工具,用handlebars.js做为页面渲染引擎,在CSS/HTML方面还用了Bootstrap框架。web
(3)在Server端,采用了Jetty、Spring、JAX-RS等。spring
(4)同时利用了Ganglia、Nagios的分布式监控能力。数据库
Ambari架构采用的是Server/Client的模式,主要由两部分组成:ambari-agent和ambari-server。ambari依赖其它已经成熟的工具,例如其ambari-server 就依赖python,而ambari-agent还同时依赖ruby, puppet,facter等工具,还有它也依赖一些监控工具nagios和ganglia用于监控集群情况。其中: apache
1. puppet是分布式集群配置管理工具,也是典型的Server/Client模式,可以集中式管理分布式集群的安装配置部署,主要语言是ruby。 json
2. facter是用python写的一个节点资源采集库,用于采集节点的系统信息,例如OS信息,主机信息等。因为ambari-agent主要是用python写的,所以用facter能够很好地采集到节点信息。api
Nagios是一款开源的免费网络监视工具,能有效监控Windows、Linux和Unix的主机状态,交换机路由器等网络设备,打印机等。在系统或服务状态异常时发出邮件或短信报警第一时间通知网站运维人员,在状态恢复后发出正常的邮件或短信通知。
Ganglia是UC Berkeley发起的一个开源集群监视项目,设计用于测量数以千计的节点。Ganglia的核心包含gmond、gmetad以及一个Web前端。主要是用来监控系统性能,如:cpu 、mem、硬盘利用率, I/O负载、网络流量状况等,经过曲线很容易见到每一个节点的工做状态,对合理调整、分配系统资源,提升系统总体性能起到重要做用。
ambari-server是一个有状态的,它维护着本身的一个有限状态机FSM。同时这些状态机存储在数据库中,前期数据库主要采用postgres。以下图所示,server端主要维护三类状态:
1. Live Cluster State:集群现有状态,各个节点汇报上来的状态信息会更改该状态;
2. Desired State:用户但愿该节点所处状态,是用户在页面进行了一系列的操做,须要更改某些服务的状态,这些状态尚未在节点上产生做用;
3. Action State:操做状态,是状态改变时的请求状态,也能够看做是一种中间状态,这种状态能够辅助Live Cluster State向Desired State状态转变。
Ambari-server的Heartbeat Handler模块用于接收各个agent的心跳请求(心跳请求里面主要包含两类信息:节点状态信息和返回的操做结果),把节点状态信息传递给FSM状态机去维护着该节点的状态,而且把返回的操做结果信息返回给Action Manager去作进一步的处理。
Coordinator模块又能够称为API handler,主要在接收WEB端操做请求后,会检查它是否符合要求,stage planner分解成一组操做,最后提供给Action Manager去完成执行操做。
所以,从上图就能够看出,Ambari-Server的全部状态信息的维护和变动都会记录在数据库中,用户作一些更改服务的操做都会在数据库上作一些相应的记录,同时,agent经过心跳来得到数据库的变动历史。
ambari-agent是一个无状态的。其功能主要分两部分:
- 采集所在节点的信息而且汇总发心跳汇报给ambari-server;
- 处理ambari-server的执行请求。
所以它有两种队列:
- 消息队列MessageQueue,或为ResultQueue。包括节点状态信息(包括注册信息)和执行结果信息,而且汇总后经过心跳发送给ambari-server;
- 操做队列ActionQueue。用于接收ambari-server返回过来的状态操做,而后能过执行器按序调用puppet或python脚本等模块完成任务。
Ambari-Server是一个WEB Server,提供统一的REST API接口,同时向web和agent开放了两个不一样的端口(默认前者是8080, 后者是8440或者8441)。它是由Jetty Server容器构建起来的,经过Spring Framework构建出来的WEB服务器,其中大量采用了google提供的Guice注解完成spring框架所须要的注入功能(想想,以前spring框架须要加载一个applicationcontext.xml文件来把bean注入进来,如今能够用Guice注解的方式就能够轻松完成)。 REST框架由JAX-RS标准来构建。
Ambari-Server接受来自两处的REST请求,Agent过来的请求处理逻辑由包org.apache.ambari.server.agent处理, 而API所的处理逻辑来自org.apache.ambari.server.api。
Ambari-Server有一个状态机管理模块,全部节点的状态信息更改都最终提供给状态机进行更改操做,所以状态机是一个很忙的组件。在Ambari-Server里面,把每一次更改操做都把它看成是一类事件,采用事件驱动机制完成对应的任务。这种思想有点借鉴已经运用在hadoop 2.x YARN里面的事件驱动机制。事件驱动机制可以一种高效的异步RPC请求方式,直接调用须要执行相应的代码逻辑,而事件驱动只须要产生事件统一提交给事件处理器,所以事件驱动须要一个更复杂的有限状态机结合起来一同使用。
Agent发送过来的心跳请求由org.apache.ambari.server.agent.HeartBeatHandler.handleHeartBeat(HeartBeat)来处理,执行完后,同时会返回org.apache.ambari.server.agent.HeartBeatResponse给agent。 org.apache.ambari.server.agent.HeartBeat里面主要含了两类信息:节点的状态信息nodeStatus和服务状态信息componentStatus。
public class HeartBeatHandler {
...
public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
throws AmbariException {
long now = System.currentTimeMillis();
if (heartbeat.getAgentEnv() != null && heartbeat.getAgentEnv().getHostHealth() != null) {
heartbeat.getAgentEnv().getHostHealth().setServerTimeStampAtReporting(now);
}
String hostname = heartbeat.getHostname();
Long currentResponseId = hostResponseIds.get(hostname);
HeartBeatResponse response;
if (currentResponseId == null) {
//Server restarted, or unknown host.
LOG.error("CurrentResponseId unknown for " + hostname + " - send register command");
// 无responseId, 新请求,就进行注册, responseId =0
return createRegisterCommand();
}
LOG.debug("Received heartbeat from host"
+ ", hostname=" + hostname
+ ", currentResponseId=" + currentResponseId
+ ", receivedResponseId=" + heartbeat.getResponseId());
if (heartbeat.getResponseId() == currentResponseId - 1) {
LOG.warn("Old responseId received - response was lost - returning cached response");
return hostResponses.get(hostname);
} else if (heartbeat.getResponseId() != currentResponseId) {
LOG.error("Error in responseId sequence - sending agent restart command");
// 心跳是历史记录,那么就要求其重启,从新注册,responseId 不变
return createRestartCommand(currentResponseId);
}
response = new HeartBeatResponse();
//responseId 加 1 , 返回一个新的responseId,下次心跳又要把这个responseId带回来。
response.setResponseId(++currentResponseId);
Host hostObject;
try {
hostObject = clusterFsm.getHost(hostname);
} catch (HostNotFoundException e) {
LOG.error("Host: {} not found. Agent is still heartbeating.", hostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Host associated with the agent heratbeat might have been " +
"deleted", e);
}
// For now return empty response with only response id.
return response;
}
//失去心跳,要求从新注册, responseId=0
if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
// After loosing heartbeat agent should reregister
LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
return createRegisterCommand();
}
hostResponseIds.put(hostname, currentResponseId);
hostResponses.put(hostname, response);
// If the host is waiting for component status updates, notify it
//若是主机正在等待组件状态更新,请通知它
//节点已经进行了注册,可是该节点尚未汇报相关状态信息,等待服务状态更新
if (heartbeat.componentStatus.size() > 0
&& hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
try {
LOG.debug("Got component status updates");
//更新服务状态机
hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
} catch (InvalidStateTransitionException e) {
LOG.warn("Failed to notify the host about component status updates", e);
}
}
if (heartbeat.getRecoveryReport() != null) {
RecoveryReport rr = heartbeat.getRecoveryReport();
processRecoveryReport(rr, hostname);
}
try {
if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
//向状态机发送更新事件,更新节点至正常状态
hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
heartbeat.getAgentEnv(), heartbeat.getMounts()));
} else { // 把节点列入不健康
hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null));
}
} catch (InvalidStateTransitionException ex) {
LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex);
hostObject.setState(HostState.INIT);
return createRegisterCommand();
}
/** * A host can belong to only one cluster. Though getClustersForHost(hostname) * returns a set of clusters, it will have only one entry. *主机只能属于一个集群。 经过getClustersForHost(hostname)返回一组集群,它只有一个条目。 * * TODO: Handle the case when a host is a part of multiple clusters. * 处理 主机是多个集群的一部分时的 状况。 */
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
if (clusters.size() > 0) {
String clusterName = clusters.iterator().next().getClusterName();
if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) {
RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
response.setRecoveryConfig(rc);
if (response.getRecoveryConfig() != null) {
LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString());
}
}
}
heartbeatProcessor.addHeartbeat(heartbeat);
// Send commands if node is active
if (hostObject.getState().equals(HostState.HEALTHY)) {
sendCommands(hostname, response);
annotateResponse(hostname, response);
}
return response;
}
...
}
安装ambari-agent 服务时会把相应在的python代码置于python执行的环境上下文中,例如其入口代码多是/usr/lib/python2.6/site-packages/ambari_agent/main.py,而且进行相关初始化工做(例如验证参数,与server创建链接,初始化安全验证证书),最后会产生一个新的控制器Controller子线程来统一管理节点的状态。Controller线程里面有一个动做队列ActionQueue线程,而且开启向Server注册和发心跳服务。能够看出来,ambari-agent主要由两个线程组成,Controller线程向Server发送注册或心跳请求,请求到的Action数据放到ActionQueue线程里面,ActionQueue线程维护着两个队列:commandQueue和resultQueue。ActionQueue线程会监听commandQueue的情况。
class Controller(threading.Thread):
def __init__(self, config, range=30):
// 在初始化Controller以前,ambari-agent就会在main.py里面进行判断:ambari-server是否正常,正常才会初始化Controller
// 省略初始化代码
def run(self):
try:
// 初始化队列线程
self.actionQueue = ActionQueue(self.config, controller=self)
self.actionQueue.start()
// 初始化注册类
self.register = Register(self.config)
// 初始化心跳类
self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
opener = urllib2.build_opener()
urllib2.install_opener(opener)
while True:
self.repeatRegistration = False
//开始注册 而且 定时发心跳
self.registerAndHeartbeat()
if not self.repeatRegistration:
logger.info("Finished heartbeating and registering cycle")
break
except:
logger.exception("Controller thread failed with exception:")
raise
logger.info("Controller thread has successfully finished")
CommandQueue队列主要有3类command:
1. REGISTER_COMMAND:该类命令主要通知agent从新向server发送注册请求;
2. STATUS_COMMAND:该类命令主要告诉agent须要向server发送某组件的状态信息;
3. EXECUTION_COMMAND:要求agent执行puppet或者软件集升级任务;
ambari-agent是一个无状态的,主要功能以下所示:
于是,它有两种队列:MessageQueue和ActionQueue。
架构图以下所示:
而对于ambari-server来讲,其是一个有状态的,它维护着本身的一个有限状态FSM。同时这些状态存储与数据库当中(DB目前能够支持多种,可按序自选),Server端主要维持三类状态:
其架构图以下所示:
ambari-server的Heartbeat Handler模块用于接收各个Agent的心跳请求(其中包含节点状态信息和返回的操做结果),把节点状态信息传递给图中的FSM模块去维护该节点的状态,并把响应以后的操做结果信息返回给Action Manager去作更加详细的处理。Coordinator模块能够看做API Handler,主要在接收Web端操做请求后,校验其合法性,Stage Planner分解成一组操做,最后提供给Action 过 Manager去完成执行操做。
于是,从上图中,咱们能够看出,ambari-server的全部状态信息的维护和变化都会被记录在数据库当中,使用者作一些更改服务的操做都会在数据库商作对应的记录,同时,Agent经过心跳来获取数据库的变更历史信息。