turbine是怎么收集指标数据的

turbine是怎么收集指标数据的

咱们经过spring cloud图形化dashboard是如何实现指标的收集展现的知道了,图形化的指标是从turbine获取到指标数据的。那么turbine的数据是从哪里来的呢?

一、数据来源

咱们经过url:http://localhost:10000/turbine.stream?cluster=default能够获取到指标的json数据。那么指标数据又是从何处获取到的。
答案是:从各个服务的/manage/hystrix.stream端点获取的html

二、turbine架构设计

turbine官方的github地址:
https://github.com/Netflix/turbine/wiki
能够找到turbine的架构设计
image_1c8cnao7u1rt5e2p1jhb1bj716b714.png-146.7kBgit

详细信息参考
https://github.com/Netflix/Turbine/wiki/Design-And-Architecture-(1.x)github

说明:turbine启动的时候,会去链接须要监控的主机,创建起监听,每个实例会有一个监听。当实例监遵从各个服务获取到数据的时候,会将数据填充到派发器dispatcher中,由派发器将数据输出到各个客户端。spring

image_1c8cno6j71v56vab17dska015741h.png-121.6kB

三、源码阅读

turbine的实如今turbine核心包下
com.netflix.turbine:turbine-corejson

在该包下,能够找到几个关键的类
InstanceMonitorHandlerQueueTupleTurbineDataDispatcherTurbineStreamServlet浏览器

咱们启动调试的时候,能够看到
image_1c8co5shg1sqf16le151179ab6d1u.png-190.5kB架构

实例的url实际上是指向具体须要监控的实例的端点,即
http://sheng:8088/manage/hystrix.stream
查看这个连接咱们能够看到
image_1c8co9jkt1gnco5b108p1bu4gds2b.png-96.4kBapp

InstanceMonitor启动监听ide

public void startMonitor() throws Exception {
        // This is the only state that we allow startMonitor to proceed in
        if (monitorState.get() != State.NotStarted) {
            return;
        }

        taskFuture = ThreadPool.submit(new Callable<Void>() {

            @Override
            public Void call() throws Exception {

                try {
                    //初始化,链接到具体实例上
                    init();
                    monitorState.set(State.Running);
                    while(monitorState.get() == State.Running) {
                        //关键代码
                        doWork();
                    }
                } catch(Throwable t) {
                    logger.warn("Stopping InstanceMonitor for: " + getStatsInstance().getHostname() + " " + getStatsInstance().getCluster(), t);
                } finally {
                    if (monitorState.get() == State.Running) {
                        monitorState.set(State.StopRequested);
                    }
                    cleanup();
                    monitorState.set(State.CleanedUp);
                }
                return null;
            }
        });
    }

private void init() throws Exception {

        HttpGet httpget = new HttpGet(url);

        HttpResponse response = gatewayHttpClient.getHttpClient().execute(httpget);

        HttpEntity entity = response.getEntity();
        InputStream is = entity.getContent();
        //初始化一个输入流
        reader = new BufferedReader(new InputStreamReader(is));

        int statusCode = response.getStatusLine().getStatusCode();
        if (statusCode != 200) {
            // this is unexpected. We probably have the wrong endpoint. Print the response out for debugging and give up here.
            List<String> responseMessage = IOUtils.readLines(reader);
            logger.error("Could not initiate connection to host, giving up: " + responseMessage);
            throw new MisconfiguredHostException(responseMessage.toString());
        }
    }

dowork()方法作了什么呢this

private void doWork() throws Exception {

        DataFromSingleInstance instanceData = null;

        //获取实例数据
        instanceData = getNextStatsData();
        if(instanceData == null) {
            return;
        } else {
            lastEventUpdateTime.set(System.currentTimeMillis());
        }

        List<DataFromSingleInstance> list = new ArrayList<DataFromSingleInstance>();
        list.add(instanceData);

        /* send to all handlers */
        //将获取到的数据添加到dispatcher中
        boolean continueRunning = dispatcher.pushData(getStatsInstance(), list);
        if(!continueRunning) {
            logger.info("No more listeners to the host monitor, stopping monitor for: " + host.getHostname() + " " + host.getCluster());
            monitorState.set(State.StopRequested);
            return;
        }
    }

getNextStatsData读取数据
image_1c8cor2ih1cb714tc1b5m8mk7ks2o.png-58kB


那么派发器是什么呢,它的实现查看TurbineDataDispatcher
查看它的pushData方法
发现调用的是tuple.pushData(statsData);tuple其实就像一个管道,查看HandlerQueueTuplepushData方法

public void pushData(K data) {
        if (stopped) {
            return;
        }
        boolean success = queue.writeEvent(data);
        if (isCritical()) {
            // track stats
            if (success) {
                counter.increment(Type.EVENT_PROCESSED);
            } else {
                counter.increment(Type.EVENT_DISCARDED);
            }
        }
    }

看到queue.writeEvent(data)、往队列里写数据
这个队列又是什么呢?
其实就是一个事件队列EventQueue,查看它的写事件方法

public boolean writeEvent(T event) {
        if (count.get() > maxCapacity) {  // approx check for capacity
            return false;
        }
        count.incrementAndGet();
        queue.add(event);
        return true;
    }

若是队列中的长度大于maxCapacity,将不会再往队列里填充数据。


当客户端链接上的时候,queue就会被消费。若是客户端没有链接上的时候,queue读出来,通过一系列的操做会写回queue中,直到队列满了就不在写了。

一、当没有客户端链接上的时候
image_1c8evuokc1u8scad10cuef11jm413.png-119.7kB

eventHandler通过一些列的处理,数据会被写回到queue中

二、当有客户端连上的时候,假设咱们经过浏览器地址栏输入了
http://localhost:10000/turbine.stream?cluster=default
此时
咱们看到eventHandler为TurbineStreamingConnection,见下图
image_1c8evlp0c1n4p491rcebcn8htm.png-159.3kB

handlData()就变成了TurbineStreamingConnection中的方法

public void handleData(Collection<T> data) {
        
        if (stopMonitoring) {
            // we have been stopped so don't try handling data
            return;
        }
        //将数据写到steamHandler中
        writeToStream(data);
    }

writeToStream()中有个关键的操做streamHandler.writeData(jsonStringForDataHash)
image_1c8f022frk5f198mk0k3g31rah1g.png-90.2kB

writeData()方法就能够将数据写到response中
image_1c8f049sgvmpaq79b813ko1nsp1t.png-85kB

客户端访问http://localhost:10000/turbine.stream?cluster=default的时候,其实就是经过TurbineStreamServlet获取到响应结果的。

相关文章
相关标签/搜索