欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

微软并发Key-Value存储库FASTER介绍

程序员文章站 2022-07-05 08:06:30
微软支持并发的Key-Value 存储库有C++与C#两个版本。号称迄今为止最快的并发键值存储。下面是C#版本翻译: FASTER C#可在.NET Framework和.NET Core中运行,并且可以在单线程和并发设置中使用。经过测试,可以在Windows和Linux上使用。它公开了一种API, ......

  微软支持并发的key-value 存储库有c++与c#两个版本。号称迄今为止最快的并发键值存储。下面是c#版本翻译:

faster c#可在.net framework和.net core中运行,并且可以在单线程和并发设置中使用。经过测试,可以在windows和linux上使用。它公开了一种api,该api可以执行读取,盲更新(upserts)和读取-修改-写入(rmw)操作的混合。它支持大于内存的数据,并接受idevice将日志存储在文件中的实现。提供了idevice本地文件系统的实现,可以写入远程文件系统。或者将远程存储映射到本地文件系统中。faster可以用作传统并发数据结构类似concurrentdictionary的高性能替代品,并且还支持大于内存的数据。它支持增量非增量数据结构类型的检查点。

faster支持三种基本操作:

  1. read:从键值存储中读取数据
  2. upsert:将值盲目向上插入到存储中(不检查先前的值)
  3. read-modify-write:更新存储区中的值,用于实现“求和”和“计数”之类的操作。

构建

在实例化faster之前,您需要创建faster将使用的存储设备。如果使用的是可移植类型byteintdouble类型,则仅需要混合日志设备。如果使用对象,则需要创建一个单独的对象日志设备。

idevice log = devices.createlogdevice("c:\\temp\\hybridlog_native.log");

然后,按如下方式创建一个faster实例:

fht = new fasterkv<key, value, input, output, empty, functions>

  (1l << 20, new functions(), new logsettings { logdevice = log });

构造函数的类型参数

有六个基本概念,在实例化faster时作为通用类型参数提供:

  1. key:这是的类型,例如long
  2. value:这是存储在faster中的值的类型。
  3. input:这是调用read或rmw时提供给faster的输入类型。它可以被视为读取或rmw操作的参数。例如,对于rmw,可是增量累加到值。
  4. output:这是读操作的输出类型将值的相关部分复制到输出。
  5. context:操作的用户定义上下文如果没有必要使用empty
  6. functions需要回调时,使用ifunctions<>调用。

回调函数

用户提供一个实例ifunctions<>。此类型封装了所有回调,下面将对其进行介绍:

  1. singlereader和并发读concurrentreader:这些用于读取存储值并将它们复制到output。单个读取器可以假定没有并发操作。
  2. singlewriter和concurrentwriter:这些用于将值从源值写入存储。
  3. completion callbacks完成回调:各种操作完成时调用。
  4. rmwupdaters:用户指定了三个更新器,initialupdater,inplaceupdater和copyupdater。它们一起用于实现rmw操作。
  5. hash table siz哈希表大小:这是分配给faster的存储行数,其中每个64字节。
  6. logsettings 日志设置:这些设置与日志的大小设备。
  7. checkpoint设置:这些是与检查相关的设置,例如检查类型和文件夹。
  8. serialization序列化设置:用于为键和值类型提供自定义序列化程序。序列化程序实现iobjectserializer<key>键和iobjectserializer<value>值。只有c#类对象非可移植类型才需要这些。
  9. key比较器:用于为key提供更好的比较器ifasterequalitycomparer<key>

构造函数参数

faster的总内存占用量由以下参数控制:

  1. 哈希表大小:此参数(第一个构造函数参数)乘以64是内存中哈希表的大小(以字节为单位)。
  2. 日志大小:logsettings.memorysizebits表示混合日志的内存部分的大小(以位为单位)。换句话说对于参数设置b,日志的大小为2 ^ b字节。如果日志指向类对象,则此大小不包括对象的大小,因为faster无法访问此信息。日志的较旧部分溢出到存储中。

sessions (threads)会话(线程)

实例化faster之后,线程可以使用session使用faster

fht.startsession();

fht.stopsession();

当所有线程都在faster上完成操作后,您最终销毁faster实例:

fht.dispose();

示例

以下是一个简单示例,其中所有数据都在内存中,因此我们不必担心挂起的i / o操作。在此示例中也没有检查点。

public static void test()

{

  var log = devices.createlogdevice("c:\\temp\\hlog.log");

  var fht = new fasterkv<long, long, long, long, empty, funcs>

    (1l << 20, new funcs(), new logsettings { logdevice = log });

  fht.startsession();

  long key = 1, value = 1, input = 10, output = 0;

  fht.upsert(ref key, ref value, empty.default, 0);

  fht.read(ref key, ref input, ref output, empty.default, 0);

  debug.assert(output == value);

  fht.rmw(ref key, ref input, empty.default, 0);

  fht.rmw(ref key, ref input, empty.default, 0);

  fht.read(ref key, ref input, ref output, empty.default, 0);

  debug.assert(output == value + 20);

  fht.stopsession();

  fht.dispose();

  log.close();

}

此示例的函数

public class funcs : ifunctions<long, long, long, long, empty>

{

  public void singlereader(ref long key, ref long input, ref long value, ref long dst) => dst = value;

  public void singlewriter(ref long key, ref long src, ref long dst) => dst = src;

  public void concurrentreader(ref long key, ref long input, ref long value, ref long dst) => dst = value;

  public void concurrentwriter(ref long key, ref long src, ref long dst) => dst = src;

  public void initialupdater(ref long key, ref long input, ref long value) => value = input;

  public void copyupdater(ref long key, ref long input, ref long oldv, ref long newv) => newv = oldv + input;

  public void inplaceupdater(ref long key, ref long input, ref long value) => value += input;

  public void upsertcompletioncallback(ref long key, ref long value, empty ctx) { }

  public void readcompletioncallback(ref long key, ref long input, ref long output, empty ctx, status s) { }

  public void rmwcompletioncallback(ref long key, ref long input, empty ctx, status s) { }

  public void checkpointcompletioncallback(guid sessionid, long serialnum) { }

}

更多例子

检查点和恢复

faster支持基于检查点的恢复。每个新的检查点都会保留(或使之持久)其他用户操作(读取,更新或rmw)。faster允许客户端线程跟踪已持久的操作和未使用基于会话的api的操作。

回想一下,每个faster线程都会启动一个与唯一的guid相关联的会话。所有faster线程操作(读取,upsert,rmw)都带有单调序列号。在任何时间点,都可以调用checkpoint以启动faster的异步检查点。在调用之后checkpoint,(最终)向每个faster线程通知一个序列号,这样可以确保直到该序列号之前的所有操作以及在该序列号之后没有任何操作被保留为该检查点的一部分。faster线程可以使用此序列号来清除等待执行的操作的任何内存缓冲区。

在恢复期间,线程可以使用继续使用相同的guid进行会话continuesession。该函数返回线程本地序列号,直到恢复该会话哈希为止。从那时起,新线程可以使用此信息来重播所有未提交的操作。

下面一个单线程的简单恢复示例。

public class persistenceexample

{

  private fasterkv<long, long, long, long, empty, funcs> fht;

  private idevice log;

  

  public persistenceexample()

  {

    log = devices.createlogdevice("c:\\temp\\hlog.log");

    fht = new fasterkv<long, long, long, long, empty, funcs>

    (1l << 20, new funcs(), new logsettings { logdevice = log });

  }

  

  public void run()

  {

    issueperiodiccheckpoints();

    runsession();

  }

  

  public void continue()

  {

    fht.recover();

    issueperiodiccheckpoints();

    continuesession();

  }

  

  /* helper functions */

  private void runsession()

  {

    guid guid = fht.startsession();

    system.io.file.writealltext(@"c:\\temp\\session1.txt", guid.tostring());

    

    long seq = 0; // sequence identifier

    

    long key = 1, input = 10;

    while(true)

    {

      key = (seq % 1l << 20);

      fht.rmw(ref key, ref input, empty.default, seq);

      seq++;

    }

    // fht.stopsession() - outside infinite loop

  }

  

  private void continuesession()

  {

    string guidtext = system.io.file.readalltext(@"c:\\temp\session1.txt");

    guid sessionguid = guid.parse(guidtext);

    

    long seq = fht.continuesession(sessionguid); // recovered seq identifier

    seq++;

    

    long key = 1, input = 10;

    while(true)

    {

      key = (seq % 1l << 20);

      fht.rmw(ref key, ref input, empty.default, seq);

      seq++;

    }

  }

  

  private void issueperiodiccheckpoints()

  {

    var t = new thread(() =>

    {

      while(true)

      {

        thread.sleep(10000);

fht.startsession();

        fht.takecheckpoint(out guid token);

        fht.completecheckpoint(token, true);

fht.stopsession();

      }

    });

    t.start();

  }

}

faster支持两种检查点概念:“快照”和“折叠”。前者是将内存中的完整快照复制到一个单独的快照文件中,而后者是自上一个检查点以来更改的增量检查点。折叠有效地将混合日志的只读标记移到尾部,因此所有数据都作为同一混合日志的一部分保留(没有单独的快照文件)。所有后续更新均写入新的混合日志尾部位置,这使fold-over具有增量性质。

项目路径:

https://github.com/microsoft/faster/tree/master/cs