微软并发Key-Value存储库FASTER介绍

微软支持并发的Key-Value 存储库有C++与C#两个版本。号称迄今为止最快的并发键值存储。下面是C#版本翻译:css

FASTER C#可在.NET Framework和.NET Core中运行,而且能够在单线程和并发设置中使用。通过测试,能够在Windows和Linux上使用。它公开了一种API,该API能够执行读取,盲更新(Upserts)和读取-修改-写入(RMW)操做的混合。它支持大于内存的数据,并接受IDevice将日志存储在文件中的实现。提供了IDevice本地文件系统的实现,也能够写入远程文件系统。或者将远程存储映射到本地文件系统中。FASTER能够用做传统并发数据结构相似ConcurrentDictionary的高性能替代品,而且还支持大于内存的数据。它支持增量或非增量数据结构类型的检查点。git

FASTER支持三种基本操做:github

  1. Read:从键值存储中读取数据bash

  2. Upsert:将值盲目向上插入到存储中(不检查先前的值)session

  3. Read-Modify-Write:更新存储区中的值,用于实现“求和”和“计数”之类的操做。数据结构

构建

在实例化FASTER以前,您须要建立FASTER将使用的存储设备。若是使用的是可移植类型(byte、int、double)类型,则仅须要混合日志设备。若是使用对象,则须要建立一个单独的对象日志设备。并发

IDevice log = Devices.CreateLogDevice("C:\\Temp\\hybridlog_native.log");

而后,按以下方式建立一个FASTER实例:异步

fht = new FasterKV<Key, Value, Input, Output, Empty, Functions> (1L << 20, new Functions(), new LogSettings { LogDevice = log });

构造函数的类型参数

有六个基本概念,在实例化FASTER时做为通用类型参数提供:ide

  1. Key:这是键的类型,例如long。函数

  2. Value:这是存储在FASTER中的值的类型。

  3. Input:这是调用Read或RMW时提供给FASTER的输入类型。它能够被视为读取或RMW操做的参数。例如,对于RMW,但是增量累加到值。

  4. Output:这是读操做的输出类型,将值的相关部分复制到输出。

  5. Context:操做的用户定义上下文,若是没有必要使用Empty。

  6. Functions:须要回调时,使用IFunctions<>调用。

回调函数

用户提供一个实例化IFunctions<>。此类型封装了全部回调,下面将对其进行介绍:

  1. SingleReader和并发读ConcurrentReader:这些用于读取存储值并将它们复制到Output。单个读取器能够假定没有并发操做。

  2. SingleWriter和ConcurrentWriter:这些用于将值从源值写入存储。

  3. Completion callbacks完成回调:各类操做完成时调用。

  4. RMWUpdaters:用户指定了三个更新器,InitialUpdater,InPlaceUpdater和CopyUpdater。它们一块儿用于实现RMW操做。

  5. Hash Table Siz哈希表大小:这是分配给FASTER的存储行数,其中每一个行为64字节。

  6. LogSettings 日志设置:这些设置与日志的大小、设备。

  7. Checkpoint设置:这些是与检查相关的设置,例如检查类型和文件夹。

  8. Serialization序列化设置:用于为键和值类型提供自定义序列化程序。序列化程序实现IObjectSerializer<Key>键和IObjectSerializer<Value>值。只有C#类对象非可移植类型才须要这些。

  9. Key比较器:用于为key提供更好的比较器IFasterEqualityComparer<Key>。

构造函数参数

FASTER的总内存占用量由如下参数控制:

  1. 哈希表大小:此参数(第一个构造函数参数)乘以64是内存中哈希表的大小(以字节为单位)。

  2. 日志大小:logSettings.MemorySizeBits表示混合日志的内存部分的大小(以位为单位)。换句话说对于参数设置B,日志的大小为2 ^ B字节。若是日志指向类对象,则此大小不包括对象的大小,由于FASTER没法访问此信息。日志的较旧部分溢出到存储中。

Sessions (Threads)会话(线程)

实例化FASTER以后,线程可使用Session来使用FASTER

fht.StartSession();fht.StopSession();

当全部线程都在FASTER上完成操做后,您最终销毁FASTER实例:

fht.Dispose();

示例

如下是一个简单示例,其中全部数据都在内存中,所以咱们没必要担忧挂起的I / O操做。在此示例中也没有检查点。

public static void Test(){ var log = Devices.CreateLogDevice("C:\\Temp\\hlog.log"); var fht = new FasterKV<long, long, long, long, Empty, Funcs> (1L << 20, new Funcs(), new LogSettings { LogDevice = log }); fht.StartSession(); long key = 1, value = 1, input = 10, output = 0; fht.Upsert(ref key, ref value, Empty.Default, 0); fht.Read(ref key, ref input, ref output, Empty.Default, 0); Debug.Assert(output == value); fht.RMW(ref key, ref input, Empty.Default, 0); fht.RMW(ref key, ref input, Empty.Default, 0); fht.Read(ref key, ref input, ref output, Empty.Default, 0); Debug.Assert(output == value + 20); fht.StopSession(); fht.Dispose(); log.Close();}

此示例的函数:

public class Funcs : IFunctions<long, long, long, long, Empty>{ public void SingleReader(ref long key, ref long input, ref long value, ref long dst) => dst = value; public void SingleWriter(ref long key, ref long src, ref long dst) => dst = src; public void ConcurrentReader(ref long key, ref long input, ref long value, ref long dst) => dst = value; public void ConcurrentWriter(ref long key, ref long src, ref long dst) => dst = src; public void InitialUpdater(ref long key, ref long input, ref long value) => value = input; public void CopyUpdater(ref long key, ref long input, ref long oldv, ref long newv) => newv = oldv + input; public void InPlaceUpdater(ref long key, ref long input, ref long value) => value += input; public void UpsertCompletionCallback(ref long key, ref long value, Empty ctx) { } public void ReadCompletionCallback(ref long key, ref long input, ref long output, Empty ctx, Status s) { } public void RMWCompletionCallback(ref long key, ref long input, Empty ctx, Status s) { } public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }}

更多例子

检查点和恢复

FASTER支持基于检查点的恢复。每一个新的检查点都会保留(或使之持久)其余用户操做(读取,更新或RMW)。FASTER容许客户端线程跟踪已持久的操做和未使用基于会话的API的操做。

回想一下,每一个FASTER线程都会启动一个与惟一的Guid相关联的会话。全部FASTER线程操做(读取,Upsert,RMW)都带有单调序列号。在任什么时候间点,均可以调用Checkpoint以启动FASTER的异步检查点。在调用以后Checkpoint,(最终)向每一个FASTER线程通知一个序列号,这样能够确保直到该序列号以前的全部操做以及在该序列号以后没有任何操做被保留为该检查点的一部分。FASTER线程可使用此序列号来清除等待执行的操做的任何内存缓冲区。

在恢复期间,线程可使用继续使用相同的Guid进行会话ContinueSession。该函数返回线程本地序列号,直到恢复该会话哈希为止。从那时起,新线程可使用此信息来重播全部未提交的操做。

下面一个单线程的简单恢复示例。

public class PersistenceExample{ private FasterKV<long, long, long, long, Empty, Funcs> fht; private IDevice log; public PersistenceExample() { log = Devices.CreateLogDevice("C:\\Temp\\hlog.log"); fht = new FasterKV<long, long, long, long, Empty, Funcs> (1L << 20, new Funcs(), new LogSettings { LogDevice = log }); } public void Run() { IssuePeriodicCheckpoints(); RunSession(); } public void Continue() { fht.Recover(); IssuePeriodicCheckpoints(); ContinueSession(); } /* Helper Functions */ private void RunSession() { Guid guid = fht.StartSession(); System.IO.File.WriteAllText(@"C:\\Temp\\session1.txt", guid.ToString()); long seq = 0; // sequence identifier long key = 1, input = 10; while(true) { key = (seq % 1L << 20); fht.RMW(ref key, ref input, Empty.Default, seq); seq++; } // fht.StopSession() - outside infinite loop } private void ContinueSession() { string guidText = System.IO.File.ReadAllText(@"C:\\Temp\session1.txt"); Guid sessionGuid = Guid.Parse(guidText); long seq = fht.ContinueSession(sessionGuid); // recovered seq identifier seq++; long key = 1, input = 10; while(true) { key = (seq % 1L << 20); fht.RMW(ref key, ref input, Empty.Default, seq); seq++; } } private void IssuePeriodicCheckpoints() { var t = new Thread(() => { while(true) { Thread.Sleep(10000);fht.StartSession(); fht.TakeCheckpoint(out Guid token); fht.CompleteCheckpoint(token, true);fht.StopSession(); } }); t.Start(); }}

FASTER支持两种检查点概念:“快照”和“折叠”。前者是将内存中的完整快照复制到一个单独的快照文件中,然后者是自上一个检查点以来更改的增量检查点。折叠有效地将混合日志的只读标记移到尾部,所以全部数据都做为同一混合日志的一部分保留(没有单独的快照文件)。全部后续更新均写入新的混合日志尾部位置,这使Fold-Over具备增量性质。

项目路径:

https://github.com/Microsoft/FASTER/tree/master/cs

相关文章
相关标签/搜索