Siddhi 是一种 lightweight, easy-to-use, open source CEP(Complex Event Processing)引擎,由wso2公司开发(http://wso2.com/about/)。apache
像绝大多数的 CEP 系统同样,Siddhi 支持对于流式数据的类 SQL 的查询,SQL 式的 query 经过 complier 翻译成 Java 代码。
当一条数据流或多条数据流流入时,Siddhi Core 会实时的 check 当前数据流是否知足定义的 query,若是知足则触发 Callback 执行相应的逻辑。多线程
Siddhi和传统的CEP系统,如Esper,相比区别?
主要是比较轻量和高效,之因此能够达到更高的 performance,由于:app
尤为是前两点很是关键,传统的CEP系统,若是Esper,都是使用单线程去处理全部的 query matching,这样虽然简单,可是效率不高,没法利用 cpu 多核。
因此 Siddhi 采用多线程,而且结合pipeline机制,以下图框架
Siddhi 将整个 query 切分红独立的 stages,即 processors,这样作的好处,首先是便于多线程化,再者,能够重用相同的 processor;
而 processor 之间经过 queue 进行链接,这里就不详细描述了,有兴趣的同窗能够去仔细看 Siddhi 的论文和文档。ide
下面咱们就来看看,最关键的,Siddhi 能够为咱们作什么?测试
这里就用几个形象的例子来讲明 Siddhi 使用的典型的场景ui
咱们先用个最简单的例子,看看若是 run 一个真正的 Siddhi 例子,this
上面说了,Siddhi 是用类 SQL 的查询语言,spa
首先须要先定义流的格式,线程
define stream TempStream (deviceID long, roomNo int, temp double);
而后定义查询,
from TempStream select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom insert into RoomTempStream;
这样就能实现一个完整的 ETL 过程,
extraction,将须要的字段从 TempStream 里面 select 出来;
transform, 将摄氏温度转换为华氏温度;
loading,将结果输出到RoomTempStream流;
很方便,不用再另外写任何的逻辑,只须要写类SQL的Query语句。
为了增长感性认识,我给出一个完成的 Java 测试例子,
SiddhiManager siddhiManager = new SiddhiManager(); String executionPlan = "" + "ddefine stream TempStream (deviceID int, roomNo int, temp float);" + "" + "@info(name = 'query1') " + "from TempStream " + "select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom " + "insert into RoomTempStream;"; ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); executionPlanRuntime.addCallback("query1", new QueryCallback() { @Override public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { EventPrinter.print(timeStamp, inEvents, removeEvents); } }); InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream"); executionPlanRuntime.start(); inputHandler.send(new Object[] {12344, 201, 28.2f}); inputHandler.send(new Object[] {12345, 202, 22.2f});
inputHandler.send(new Object[] {12346, 203, 24.2f});
//Shutting down the runtime executionPlanRuntime.shutdown(); //Shutting down Siddhi siddhiManager.shutdown();
Siddhi 支持不少中类型的 window,具体参考https://docs.wso2.com/display/CEP400/Inbuilt+Windows#InbuiltWindows-time
这里给出最基本的,基于时间窗口的例子,
from TempStream#window.time(1 min) select roomNo, avg(temp) as avgTemp group by roomNo insert all events into AvgRoomTempStream ;
这个查询会计算以1分钟为滑动窗口的,每一个 room 的平均温度
Siddhi时间窗口也支持,按照外部的输入的时间进行聚合,可是它要求时间是必须递增的;这点咱们brain的聚合库比它通用,能够适用于非单调递增的场景
Siddhi 支持基于 window 的多个流的实时 join,
from TempStream[temp > 30.0]#window.time(1 min) as T join RegulatorStream[isOn == false]#window.length(1) as R on T.roomNo == R.roomNo select T.roomNo, T.temp, R.deviceID, 'start' as action insert into RegulatorActionStream ;
上面的查询将,TempStream 和RegulatorStream 经过 roomNo 进行 join
这种 query 最能表达出 CEP 的威力,什么是Pattern Query?
“Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival.”
直接看个例子,用 Pattern 查询来 detect credit card/ATM transaction frauds:
from every a1 = atmStatsStream[amountWithdrawed < 100] -> b1 = atmStatsStream[amountWithdrawed > 10000 and a1.cardNo == b1.cardNo] within 1 day select a1.cardNo as cardNo, a1.cardHolderName as cardHolderName, b1.amountWithdrawed as amountWithdrawed, b1.location as location, b1.cardHolderMobile as cardHolderMobile insert into possibleFraudStream;
注意看到这个符号‘->’,这个表示 event 发生顺序,
上面这个查询的意思就是,在一天内,出现一次取现金额 < 100后,同一张卡,出现取现金额 > 10000,则认为多是 fraud。
固然这只是个例子,不是说这样真的能够 detect fraud。你能够参照这个,写出更为复杂的查询。
和 pattern 的区别是,pattern 的多个 event 之间能够是不连续的,但 sequence 的 events 之间必须是连续的。
咱们能够看个例子,用 sequence 来发现股票价格的 peak:
from every e1=FilteredStockStream[price>20],
e2=FilteredStockStream[((e2[last].price is null) and price>=e1.price) or ((not (e2[last].price is null)) and price>=e2[last].price)],
e3=FilteredStockStream[price<e2[last].price] select e1.price as priceInitial, e2[last].price as pricePeak, e3.price as priceAfterPeak insert into PeakStream ;
上面的查询的意思,
e1,收到一条 event.price>20
e2,后续收到的全部 events 的 price,都大于前一条 event
e3,最终收到一条 event 的 price,小于前一条 event
ok,咱们发现了一个peak
Siddhi 还有其余不少的功能,这里就不一一说明。。。。。。
那么最后,咱们看看如何将 Siddhi 融入到咱们当前的框架中,达到做为 Brain 补充的目的。
我将 Siddhi core 封装成一个 Siddhi Bolt,这样能够在 JStorm 的 topology 中很灵活的,选择是否什么方案,能够部分统计用 brain,部分用 Siddhi,很是简单。
废话不说,直接给出源码,供你们参考,
public class SiddhiBolt implements IRichBolt { protected OutputCollector collector; protected SiddhiManager siddhiManager = null; protected String executionPlan = null; ExecutionPlanRuntime executionPlanRuntime = null; protected HashMap<String,InputHandler> handlers = null; public SiddhiBolt(String plan) { this.executionPlan = plan; } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.siddhiManager = new SiddhiManager(); this.executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); addCallbacks(); handlers = new HashMap<String,InputHandler>(); executionPlanRuntime.start(); } public void execute(Tuple tuple) { String inputStream = tuple.getSourceStreamId(); InputHandler inputHandler = getInputHandler(inputStream); List<Object> values = tuple.getValues(); Object[] objects = values.toArray(); try { inputHandler.send(objects); }catch (Exception e){ LOG.error("Send stream event error: ", e); } // collector.fail(tuple); //test replay collector.ack(tuple); // remember ack the tuple // Make sure that add anchor tuple if you want to track it // collector.emit(streamid, tuple,new Values(counters, now)); } public InputHandler getInputHandler(String streamName){ InputHandler handler = null; if(handlers.containsKey(streamName)) handler = handlers.get(streamName); else { handler = executionPlanRuntime.getInputHandler(streamName); if (handler != null) { handlers.put(streamName, handler); } } return handler; } //Need Override public void addCallbacks( ){ //StreamCallback example executionPlanRuntime.addCallback("outputStream", new StreamCallback() { @Override public void receive(Event[] events) { LOG.info("receive events: " + events.length); for (Event e:events) LOG.info(e); } }); //QueryCallback example executionPlanRuntime.addCallback("query1", new QueryCallback() { @Override public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { printEvents(timeStamp, inEvents, removeEvents); } }); } public void printEvents(long timeStamp, Event[] inEvents, Event[] removeEvents){ StringBuilder sb = new StringBuilder(); sb.append("Events{ @timeStamp = ").append(timeStamp).append(", inEvents = ").append( Arrays.deepToString(inEvents)).append(", RemoveEvents = ").append(Arrays.deepToString(removeEvents)).append(" }"); LOG.info(sb.toString()); } public void cleanup() { //Shutting down the runtime executionPlanRuntime.shutdown(); //Shutting down Siddhi siddhiManager.shutdown(); } }
1. Siddhi paper, https://people.apache.org/~hemapani/research/papers/siddi-gce2011.pdf
2. Siddhi doc, https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0