http://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/41378.pdfweb
由于当前的其余的流式系统,没法同时知足 fault tolerance, versatility, and scalability 的需求。express
Spark Streaming [34] and Sonora [32] do excellent jobs of efficient checkpointing, but limit the space of operators that are available to user code. windows
S4 [26] does not provide fully fault-tolerant persistent state后端
Storm’s [23] exactly-once mechanism for record delivery, Trident [22], requires strict transaction ordering to operate. 网络
Streaming SQL systems [1] [2] [5] [6] [21] [24] provide succinct and simple solutions to many streaming problems, but intuitive state abstractions and complex application logic (e.g. matrix multiplication) are more naturally expressed using the operational flow of an imperative language rather than a declarative language like SQL.架构
Note,imperative language, declarative language, function language。refer:http://stackoverflow.com/questions/1784664/what-is-the-difference-between-declarative-and-imperative-programmingapp
先描述应用场景,async
Google’s Zeitgeist pipeline is used to track trends in web queries.
This pipeline ingests a continuous input of search queries and performs anomaly detection, outputting queries which are spiking or dipping as quickly as possible.分布式
Google’s Zeitgeist 这个服务用于 track web 查询的趋势的,对持续的 search queries 进行 anomaly detection,尽量快的发现spiking or dipping。ide
架构以下,
Our approach is to bucket records into one-second intervals and to compare the actual traffic for each time bucket to the expected traffic that the model predicts.
If these quantities are consistently different over a non-trivial number of buckets, then we have high confidence that a query is spiking or dipping.
In parallel, we update the model with the newly received data and store it for future use.
场景中关键的几点,
Persistent Storage: It is important to note that this implementation requires both short- and long-term storage.
A spike may only last a few seconds, and thus depend on state from a small window of time, whereas model data can correspond to months of continuous updates.
LowWatermarks:
在现实的场景中,网络环境是很复杂的,当一个时间点出现dipping的时候,有两种可能性,
真正的dipping,这个点query确实变少了
因为网络或其余问题,数据被delay了,尚未收到
那么天然产生的问题,我如何知道这个时间点的数据是否到齐?
MillWheel addresses this by providing a low watermark for incoming data for each processing stage (e.g. Window Counter, Model Calculator), which indicates that all data up to a given timestamp has been received.
MillWheel提供 low watermark机制来告诉你何时数据会到齐。
固然low watermark每每也是启发式获得的,其实并不能完美的解这个问题,只能说若是过了 low watermark 尚未数据来,咱们有 high confidence 来讲应该是没有数据,而不是被delay
Duplicate Prevention: For Zeitgeist, duplicate record deliveries could cause spurious spikes.
咱们要在平台层面保证exactly-once
整理出的详细需求以下:
• Data should be available to consumers as soon as it is published (i.e. there are no system-intrinsic barriers to ingesting inputs and providing output data). 好比micro-batch就是种 system-intrinsic barriers
• Persistent state abstractions should be available to user code, and should be integrated into the system’s overall consistency model.
• Out-of-order data should be handled gracefully by the system. 能够处理时间乱序的数据
• A monotonically increasing low watermark of data timestamps should be computed by the system. 系统会生成 low watermarker
• Latency should stay constant as the system scales to more machines. 保证 latency
• The system should provide exactly-once delivery of records. 保证 exactly-once 语义
Abstractly, inputs and outputs in MillWheel are represented by (key, value, timestamp) triples.
Computations,等同于Bolt
Application logic lives in computations, which encapsulate arbitrary user code.
Keys
Keys are the primary abstraction for aggregation and comparison between different records in MillWheel.
For every record in the system, the consumer specifies a key extraction function, which assigns a key to the record.
注意在,millwhell中,相同key的record是被串行处理的,只有不一样key的record才能够被并行处理
Streams,等同于Storm里面的流
Streams are the delivery mechanism between different computations in MillWheel.
Persistent State
In its most basic form, persistent state in MillWheel is an opaque byte string that is managed on a per-key basis.
The user provides serialization and deserialization routines (such as translating a rich data structure in and out of its wire format), for which a variety of convenient mechanisms (e.g. Protocol Buffers [13]) exist.
Persistent state is backed by a replicated, highly available data store (e.g. Bigtable [7] or Spanner [9]), which ensures data integrity in a way that is completely transparent to the end user.
Common uses of state include counters aggregated over windows of records and buffered data for a join.
这里persistent state,能够认为是checkpoint,注意,MillWheel的checkpoint是 per-key basis的,能够在MillWheel起到很关键的做用
用户须要提供序列号和反序列化的逻辑,这些checkpoint每每被存到像bigtable这样的分布式存储中
每每像有状态的computation就须要存persistent state,好比基于窗口的聚合计数,或流join
Low Watermarks
对于computation,当给定low watermark,就不该该收到比它还早的数据
Definition: We provide a recursive definition of low watermarks based on a pipeline’s data flow.
min(oldest work of A, low watermark of C : C outputs to A)
oldest work of A,是A中最老的record的时间戳
而C是A的父节点,那么A的low watermark不可能比C迟,由于A必定比C迟收到数据,因此A的low watermark必定是小于等于C的low watermark的
这样递归的结果是,最终low watermark会取决于injector(即,源),而对于injector的input,确定是外部系统好比kafka这样的队列,或文件系统,那么injector怎么知道它的low watermark
injector实际上是不知道的,只能作estimate,好比对于文件系统,能够以文件的create时间做为low watermark,文件里面必定不会有比create time更早的记录
因此low watermark机制,是没法完美解这个问题的,都会有too fast,too late的问题
Timers,即trigger,解决when的问题
A simple implementation of dips in Zeitgeist would set a low watermark timer for the end of a given time bucket, and report a dip if the observed traffic falls well below the model’s prediction.
终于到了关键的地方了,
Exactly-Once Delivery
MillWheel是如何保证exactly-once语义的,
Upon receipt of an input record for a computation, the MillWheel framework performs the following steps:
• The record is checked against deduplication data from previous deliveries; duplicates are discarded.
• User code is run for the input record, possibly resulting in pending changes to timers, state, and productions.
• Pending changes are committed to the backing store.
• Senders are ACKed.
• Pending downstream productions are sent.
两点须要注意的,
一是,它会去重,这样能够保证exactly-once,如何去后面说
其实通常的streaming系统均可以作到at-least once,因此作到exactly-once,只须要作到去重便可
你能够依赖外部存储,或者系统里面直接作掉
二是,对中间状态作checkpoint
MillWheel如何在系统层面作去重,
The system assigns unique IDs to all records at production time.
We identify duplicate records by including this unique ID for the record in the same atomic write as the state modification.
If the same record is later retried, we can compare it to the journaled ID, and discard and ACK the duplicate.
经过为每一个record增长unique id
为了快速知道这个id是否出现过,使用bloom filter
Since we cannot necessarily store all duplication data in-memory, we maintain a Bloom filter of known record fingerprints, to provide a fast path for records that we have provably never seen before.
若是filter miss,咱们须要读后端存储才能判断是不是duplicate
In the event of a filter miss, we must read the backing store to determine whether a record is a duplicate.
这个怎么实现?怎么判断是filter miss,仍是新出现的record?出现duplicate毕竟不是常常发生的
为了防止record id爆掉,须要回收,有个问题?回收后,bloom filter须要从新初始化吗,仍是说bloom filter自己是支持过时的
Record IDs for past deliveries are garbage collected after MillWheel can guarantee that all internal senders have finished retrying.
Strong Productions
We checkpoint produced records before delivery in the same atomic write as state modification.
We call this pattern of checkpointing before record production strong productions.
这部用以保证at-least once,storm是经过spout超时重发的,后续的系统不多继续沿用这个方式,由于这样作周期太长
Millwheel或Linkedin的Samza都是采用local重发的方式,好比MillWheel,在produce record以前,会把checkpoint和状态修改放在一个原子写中作掉,checkpoint每每写入bigtable中
固然下层节点,成功处理完该record,会send回acker,这时,咱们能够把checkpoint删除
若是这时crash,咱们能够come back时,从checkpoint中读出record,从新produce
若是以前不作checkpoint,当come back时,会以当前状态(好比计数,有可能新到数据已产生更新)来produce,这样就会产生不一致
另外区别于persistent state,这里checkpoint特指produced record
Weak Productions and Idempotency
MillWheel经过 record id 和 Strong Production 来保证 exactly-once 语义,这其中也是有不少代价的,有些场景不须要保证exactly-once,at-least onces就足够了,好比不少无状态的场景
因此他提供Weak production来知足这种需求。
不须要保证exactly-once,就不去重就ok了,disabling exactly-once can be accomplished simply by skipping the deduplication pass
是否是checkpoint produced records也能够彻底去掉了,直接produce,而后等ack,失败或超时就重发,那这样就和storm同样了,链路长的时候,周期会很长
MillWheel提供的优化就是 weak productions,
好比,对于A-》B-》C的链路
B-》C的produce,超过1s尚未返回
咱们这时候,对该produce进行checkpoint,而后直接ack A,避免A继续等待
固然B会继续等待,直到收到C的ack,才将该checkpoint删除
若是此时B Crash,那么当B restart,他会本身去replay上次的produce,对A透明,直到成功,才会删除checkpoint
In implementing mechanisms to manipulate user state in MillWheel, we discuss both the “hard” state that is persisted to our backing store and the “soft” state which includes any in-memory caches or aggregates.
We must satisfy the following user-visible guarantees:
• The system does not lose data.
• Updates to state must obey exactly-once semantics.
• All persisted data throughout the system must be consistent at any given point in time.
• Low watermarks must reflect all pending state in the system.
• Timers must fire in-order for a given key.
首先,为了不不一致,全部per-key的操做,包含persist,checkpoint,状态更新,都会在一个原子写中完成
To avoid inconsistencies in persisted state (e.g. between timers, user state, and production checkpoints), we wrap all per-key updates in a single atomic operation.
再者,对于僵尸writer或因为网络延迟致使的延迟写,采用sequencer的方式,每一个写都有sequence id,过时的写请求会被丢弃;而且在新的workers启动时须要invalid以前的sequencers
As work may shift between machines (due to load balancing, failures, or other reasons) a major threat to our data consistency is the possibility of zombie writers and network remnants issuing stale writes to our backing store.
To address this possibility, we attach a sequencer token to each write, which the mediator of the backing store checks for validity before allowing the write to commit.
New workers invalidate any extant sequencers before starting work, so that no remnant writes can succeed thereafter.
因此,对于MillWheel,对于一个给定的key,只能有一个worker writer有权限执行写操做,这个是MillWheel保证写一致性的关键
Thus, we can guarantee that, for a given key, only a single worker can write to that key at a particular point in time.
In order to quickly recover from unplanned process failures, each computation worker in MillWheel can checkpoint its state at an arbitrarily fine granularity (in practice, sub-second or per-record granularity is standard, depending on input volume). Our use of always-consistent soft state allows us to minimize the number of occasions when we must scan these checkpoints to specific cases – machine failures or load-balancing events. When we do perform scans, these can often be asynchronous, allowing the computation to continue processing input records while the scan progresses.