本文是Naiad论文的阅读笔记,Naiad是一个执行循环并行数据流程序的分布式系统,提供了高吞吐批量处理、低延迟流式处理、迭代计算和增量计算等特性。算法
不少数据处理任务要求对结果进行低延迟可交互访问、迭代子计算、一致的中间输出。下图展现了这些需求:这个应用在实时数据流上进行迭代计算,在结果上交互式查询最新并一致的结果。编程
为了知足以上要求,做者设计了时序数据流计算模型:安全
在时序数据流是一个有向无环图,每一个节点是有状态的,从有向的边上发送和接收带有时间戳的消息。bash
时序数据流图包含输入节点和输出节点,用来接收输入和给出输出。外部的消息产生者会给每一个消息标上epoch,而且在数据流结束后发送“关闭”消息。网络
时序数据流支持循环上下文,一个循环上下文包含进入节点(I)、退出节点(E)和反馈节点(F)。为了支持循环上下文,消息的时间戳设计以下:分布式
其中e为epoch,为第k层循环对应的计数器,一次循环中的时间戳变化以下函数
节点 | 输入时间戳 | 输出时间戳 |
---|---|---|
进入节点 | ![]() |
![]() |
退出节点 | ![]() |
![]() |
反馈节点 | ![]() |
![]() |
时间戳的顺序定义为,对于和
,
当且仅当
而且
。优化
每一个节点须要实现两个回调函数:this
v.OnRecv(e: Edge, m: Message, t: Timestamp)
v.OnNotify(t: Timestamp).
复制代码
在回调函数中能够调用两个函数:spa
this.SendBy(e: Edge, m: Message, t: Timestamp)
this.NotifyAt(t: Timestamp)
复制代码
全部的调用都会排队执行,v.OnNotify(t)
会在全部的
v.OnRecv(e,m,t')
执行完以后才执行。另外,调用v.OnNotify(t')
和v.OnRecv(e,m,t')
的参数须要。
下面是一个示例程序,获取输入后将惟一的元素输出到output1
,将计数输出到output2
。
class DistinctCount<S,T> : Vertex<T>
{
Dictionary<T, Dictionary<S,int>> counts;
void OnRecv(Edge e, S msg, T time) {
if (!counts.ContainsKey(time)) {
counts[time] = new Dictionary<S,int>();
this.NotifyAt(time);
}
if (!counts[time].ContainsKey(msg)) {
counts[time][msg] = 0;
this.SendBy(output1, msg, time);
}
counts[time][msg]++;
}
void OnNotify(T time) {
foreach (var pair in counts[time])
this.SendBy(output2, pair, time);
counts.Remove(time);
}
}
复制代码
发送通知须要判断将来再也不会出现带有给定时间戳的消息。将来消息会绑定的时间戳和未处理的事件(消息和通知)以及图结构决定,根据消息不能逆时间传递的特性,能够计算出每一个消息时间的下界。
每一个事件都对应着一个时间戳和一个位置,能够将其组成点戳
咱们说会致使
当且仅当存在一条路径
,最终获得的点戳
知足
。Naiad会找出
到
最短的路径,检查是否知足
来得知
是否会致使
。
调度会维护一个活跃点戳集合,每一个对应着至少一个未完成的事件。每一个点戳包含一个出现计数(包含这个点戳的时间数目)和先导计数(致使这个点戳的时间数目)。当节点产生和消耗事件时,点戳更新方式以下:
操做 | 更新规则 |
---|---|
v.SendBy(e,m,t) | OC[(t, e)] ← OC[(t, e)] + 1 |
v.OnRecv(e,m,t) | OC[(t, e)] ← OC[(t, e)] − 1 |
v.NotifyAt(t) | OC[(t, v)] ← OC[(t, v)] + 1 |
v.OnNotify(t) | OC[(t, v)] ← OC[(t, v)] − 1 |
当点戳的出现计数变为零以后,便可减少能够致使的后续点戳的先导计数,而当点戳的先导计数为零时,在这个点戳以前的通知均可以安全发送。
数据流图会分布到不一样的工做节点上,边可使用分区函数将消息传送到不一样节点上,若是没有分区函数那么消息会传递给本机上的下一个节点。
其中的工做节点负责本身部分的消息的通知的接受和发送。 为了在分布式环境下正确触发通知,经过广播点戳的出现计数维护全局的出现计数。
Naiad使用了一个简单的容错方法:每一个节点实现CHECKPOINT
和RESTORE
接口,系统调用它们保存全局一致的检查点用于故障恢复。
由于丢包、垃圾回收等缘由会致使一些工做变慢,做者使用了多种方法尽可能避免这种状况的发生:
全部的Naiad程序有如下模式:首先,定义一个数据流图,包含输入阶段、计算阶段和输出阶段;而后,将数据送给输入阶段。例如,一个典型的MapReduce程序以下:
// 1a. Define input stages for the dataflow.
var input = controller.NewInput<string>();
// 1b. Define the timely dataflow graph.
// Here, we use LINQ to implement MapReduce.
var result = input.SelectMany(y => map(y))
.GroupBy(y => key(y),
(k, vs) => reduce(k, vs));
// 1c. Define output callbacks for each epoch
result.Subscribe(result => { ... });
// 2. Supply input data to the query.
input.OnNext(/* 1st epoch data */);
input.OnNext(/* 2nd epoch data */);
input.OnNext(/* 3rd epoch data */);
input.OnCompleted();
复制代码
做者将一些高层次的编程模型打包成了库供开发人员使用,但愿大部分的应用场景可使用库来实现,这些库是基于Naiad提供的图构建接口。