项目背景:前端
(1) 已有监控系统采用的OpenTSDB方案apache
(2) 目前一些大数据应用,尤为是基于spark streaming的流式应用,会实时计算生成一些指标数据,借用监控系统的存储。后端
(3) 须要前端展现实时分析结果,采用zeppelin展现方式,可是目前zeppelin不支持OpenTSDB后端引擎支持app
So, 本身开发!ide
插播: 刚去访问官方发现0.6.0版本发布了! http://zeppelin.apache.org/docs/0.6.0/大数据
因为Zeppelin运行环境已经有了该依赖包,因此咱们再建立自定义Interpreter插件的时候只须要在代码中对其依赖,打包过程当中不须要打包该包。因此使用provided依赖方式。ui
注意:该包为内部开发依赖包spa
public class TsdInterpreter extends Interpreter插件
在实现类中添加如下代码实现当前插件的注册code
static {
Interpreter.register("tsd", "tsd", TsdInterpreter.class.getName());
}
以tsd名称注册,那么Zeppelin前端在调用OpenTSDB查询的时候,只须要指定后端引擎名称%tsd便可。
public InterpreterResult interpret(String cmd, InterpreterContext context)
cmd: 即在Zeppelin交互式界面编写的命令,不包含%tsd
context: 当前插件的上下文,主要包含插件的配置信息,例如操做OpenTSDB的时候就须要从上下文中获取OpenTSDB的IP和端口参数。
该方法实现的核心思想就是: 解析命令=>实例化OpenTSDB操做客户端=>操做OpenTSDB客户端进行数据查询=> 获取返回结果 封装成InterpreterResult对象。
贴核心代码吧:
Properties intpProperty = getProperty();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String value = (String) intpProperty.get(key);
if (key.equals("tsd.host") ) {
host = value;
} else if (key.equals("tsd.port")) {
port = value;
}
}
propertiesUtil.setOpentsdbIp(host);
propertiesUtil.setPort(Integer.parseInt(port));
Scanner scanner = new Scanner(items[1]);
String start, end, metric, tagsStr;
if (scanner.hasNext())
start = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"1!Please enter the correct format!");
}
if (scanner.hasNext())
end = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"2!Please enter the correct format!");
}
if (scanner.hasNext())
metric = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"3!Please enter the correct format!");
}
if (scanner.hasNext())
tagsStr = scanner.next();
else {
return processHelp(InterpreterResult.Code.ERROR,
"4!Please enter the correct format!");
}
// cpid=tudou,busiid=*,code=1
String[] tagsStrs = tagsStr.split(",");
Map<String, String> tags = new HashMap<String, String>();
for (String s : tagsStrs) {
int index = s.indexOf('=');
if (index == -1)
continue;
String tagK = s.substring(0, index);
String tagV = s.substring(index + 1);
tags.put(tagK, tagV);
}
QueryService queryService = new QueryService();
try {
List<QueryResponseEntity> responses = queryService
.queryByMetric(start, end, metric, tags, null, "sum");
StringBuffer sb = new StringBuffer();
// Map<String, String> alldps = new HashMap<String, String>();
// build header
Set<String> keys = new HashSet<String>();
sb.append("time\t");
for (QueryResponseEntity st : responses) {
sb.append(st.getTags().toString() + "\t");
keys.addAll(st.getDps().keySet());
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1, "\n");
List<String> keys2 = new ArrayList<String>(keys);
Collections.sort(keys2);
// build lines
Iterator<String> it = keys2.iterator();
long t;
while (it.hasNext()) {
String key = it.next(); // 每一行的时间戳
t = Long.parseLong(key);
sb.append(sdf.format(new Date(t*1000)) + "\t");
for (QueryResponseEntity st : responses) {
Map<String, String> dps = st.getDps();
String value = dps.get(key);
if (value != null) {
sb.append(value + "\t");
} else {
sb.append(" \t");
}
}
sb.replace(sb.lastIndexOf("\t"), sb.lastIndexOf("\t") + 1,
"\n");
}
// sb.toString()
return new InterpreterResult(InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TABLE, sb.toString());
在ZEPPELIN_HOME/conf/zeppelin-site.xml
在ZEPPELIN_HOME/interpreter
建立文件夹tsd,将全部依赖包拷贝到该文件夹下