咱们经过url:http://localhost:10000/turbine.stream?cluster=default
能够获取到指标的json数据。那么指标数据又是从何处获取到的。
答案是:从各个服务的/manage/hystrix.stream端点获取的html
turbine官方的github地址:https://github.com/Netflix/turbine/wiki
能够找到turbine的架构设计git
详细信息参考https://github.com/Netflix/Turbine/wiki/Design-And-Architecture-(1.x)
github
说明:turbine启动的时候,会去链接须要监控的主机,创建起监听,每个实例会有一个监听。当实例监遵从各个服务获取到数据的时候,会将数据填充到派发器dispatcher中,由派发器将数据输出到各个客户端。spring
turbine的实如今turbine核心包下com.netflix.turbine:turbine-core
json
在该包下,能够找到几个关键的类InstanceMonitor
、HandlerQueueTuple
、TurbineDataDispatcher
、TurbineStreamServlet
浏览器
咱们启动调试的时候,能够看到架构
实例的url实际上是指向具体须要监控的实例的端点,即http://sheng:8088/manage/hystrix.stream
查看这个连接咱们能够看到app
InstanceMonitor
启动监听ide
public void startMonitor() throws Exception { // This is the only state that we allow startMonitor to proceed in if (monitorState.get() != State.NotStarted) { return; } taskFuture = ThreadPool.submit(new Callable<Void>() { @Override public Void call() throws Exception { try { //初始化,链接到具体实例上 init(); monitorState.set(State.Running); while(monitorState.get() == State.Running) { //关键代码 doWork(); } } catch(Throwable t) { logger.warn("Stopping InstanceMonitor for: " + getStatsInstance().getHostname() + " " + getStatsInstance().getCluster(), t); } finally { if (monitorState.get() == State.Running) { monitorState.set(State.StopRequested); } cleanup(); monitorState.set(State.CleanedUp); } return null; } }); }
private void init() throws Exception { HttpGet httpget = new HttpGet(url); HttpResponse response = gatewayHttpClient.getHttpClient().execute(httpget); HttpEntity entity = response.getEntity(); InputStream is = entity.getContent(); //初始化一个输入流 reader = new BufferedReader(new InputStreamReader(is)); int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != 200) { // this is unexpected. We probably have the wrong endpoint. Print the response out for debugging and give up here. List<String> responseMessage = IOUtils.readLines(reader); logger.error("Could not initiate connection to host, giving up: " + responseMessage); throw new MisconfiguredHostException(responseMessage.toString()); } }
dowork()
方法作了什么呢this
private void doWork() throws Exception { DataFromSingleInstance instanceData = null; //获取实例数据 instanceData = getNextStatsData(); if(instanceData == null) { return; } else { lastEventUpdateTime.set(System.currentTimeMillis()); } List<DataFromSingleInstance> list = new ArrayList<DataFromSingleInstance>(); list.add(instanceData); /* send to all handlers */ //将获取到的数据添加到dispatcher中 boolean continueRunning = dispatcher.pushData(getStatsInstance(), list); if(!continueRunning) { logger.info("No more listeners to the host monitor, stopping monitor for: " + host.getHostname() + " " + host.getCluster()); monitorState.set(State.StopRequested); return; } }
getNextStatsData
读取数据
那么派发器是什么呢,它的实现查看TurbineDataDispatcher
查看它的pushData
方法
发现调用的是tuple.pushData(statsData);
而tuple
其实就像一个管道,查看HandlerQueueTuple
的pushData
方法
public void pushData(K data) { if (stopped) { return; } boolean success = queue.writeEvent(data); if (isCritical()) { // track stats if (success) { counter.increment(Type.EVENT_PROCESSED); } else { counter.increment(Type.EVENT_DISCARDED); } } }
看到queue.writeEvent(data)
、往队列里写数据
这个队列又是什么呢?
其实就是一个事件队列EventQueue
,查看它的写事件方法
public boolean writeEvent(T event) { if (count.get() > maxCapacity) { // approx check for capacity return false; } count.incrementAndGet(); queue.add(event); return true; }
若是队列中的长度大于maxCapacity,将不会再往队列里填充数据。
当客户端链接上的时候,queue就会被消费。若是客户端没有链接上的时候,queue读出来,通过一系列的操做会写回queue中,直到队列满了就不在写了。
一、当没有客户端链接上的时候
eventHandler通过一些列的处理,数据会被写回到queue中
二、当有客户端连上的时候,假设咱们经过浏览器地址栏输入了http://localhost:10000/turbine.stream?cluster=default
此时
咱们看到eventHandler为TurbineStreamingConnection
,见下图
handlData()
就变成了TurbineStreamingConnection
中的方法
public void handleData(Collection<T> data) { if (stopMonitoring) { // we have been stopped so don't try handling data return; } //将数据写到steamHandler中 writeToStream(data); }
writeToStream()
中有个关键的操做streamHandler.writeData(jsonStringForDataHash)
writeData()
方法就能够将数据写到response中
客户端访问http://localhost:10000/turbine.stream?cluster=default
的时候,其实就是经过TurbineStreamServlet
获取到响应结果的。