Skip to content

Instantly share code, notes, and snippets.

@itn3000
Last active November 11, 2019 09:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itn3000/8811347c7066ecf02b87e008c1fb410f to your computer and use it in GitHub Desktop.
Save itn3000/8811347c7066ecf02b87e008c1fb410f to your computer and use it in GitHub Desktop.
Memo about Microsoft/FASTER(Japanese)

FASTERについてのメモ(日本語)

概要

  • KVSライブラリ
    • 組み込みKVSとしての使用が可能
    • プロセス間でのストレージ共有は想定していない?
    • メモリ以上のDBサイズを設定可能(メモリ+ファイル読み書き)
      • メモリオンリーも可
      • メモリオンリーにした場合、ディスク行きになる部分は単純にキーごと捨てられる=古いものから消えていく
    • LiteDBと多少領域が被るかも(最終的に目指す所は異なるが)
    • ログデータベース(耐障害性)とインメモリデータベース(性能)の良い所どりを目指す
    • ユーザー定義のupdate、readコールバックを設定可能
      • C#版ではRoslyn,C++版ではテンプレートを使って実現?
      • C++はテンプレート
      • 新版(少なくとも2019.4.5.2以降)ではUnsafeコードを使うことにより、Roslyn依存を排除
  • C#とC++の二つの実装が存在
    • 両者間での性能比較の数値は無し
  • 三種類のストレージ出力
    • オンメモリ(In-Memory)
    • 追記のみログ(Append-Only)
    • 複合(HybridLog)
  • Upsertに特に注力
    • 同時書き込みスレッド数に比例したスループット向上
    • オンメモリにある場合、構築済みデータに対する書き込み性能はスレッド数に比例して増加する
      • ConcurrentDictionaryに対してかなり有利となる
    • オンメモリにある場合、読み込みはConcurrentDictionaryとほぼ変わらない
    • https://github.com/Microsoft/FASTER/wiki/Performance-of-FASTER-in-C%23
    • Concurrent
  • 非同期処理に関してはEAPパターンを使用している
    • 内部的にはAPM

できてないこと

  • キーの列挙は現在サポートしていない
    • FasterKV.Log.Scanは存在するが、Compactしない限り値更新後の旧レコードまで持ってくるので、最新版のレコードのみ列挙したい場合には難しい(列挙時に旧バージョンのアドレスが取得できるため、一応不可能ではない)
    • TODOには入っている
  • レプリケーション等の機能は無し
    • 用途的に必要なさそう

仕様上の注意

  • 更新でも追記するため、定期的なCompact実行は必須
    • 100000回のlong,longな更新で5-6MB程度ログが溜まるため、空間効率は優先度低めっぽい
    • long,longなデータに対して100000回程度の更新の後のCompactは、ディスク書き出しがされていてもほとんど数~数十msec
      • 100_0000回だと数百msec程度かかったので、割と頻繁にCompactしてもいいかもしれない
    • 数秒に一回程度の頻度でも問題ないかもしれない
    • その他のパフォーマンスインパクトはどうなるのか?
      • ほとんど影響はなかった
    • 資料を見る限り、完全に止まるということは無いかもしれない
  • スレッドごとにStartSessionまたはContinueSessionし、終了時にStopSessionする必要がある
    • しないと各種操作時に例外
    • await後はスレッドが大体変更されるので、async awaitと混在してはならない
    • upsertあるいはread時にmonotonicSerialNumという値を渡す必要があるが、ContinueSession時に、最後にRead等で与えた数値が返される
  • KeyとValue両方にBlittableな型を使用するのが一番効率が良い
  • Windowsで動くときは、ストレージ操作で特殊実装しているので、非Windows環境ではまた特性が異なる可能性がある
    • オンメモリの場合はさほど変わらないと予想
  • エンディアン等は考慮しないでそのまま出力しているようなので、マシン外に持ち出すことは恐らく考えられていない
  • ファイルに書き出されたデータ
    • 古いデータからディスクに書き出される
      • 新しいデータの方が読出しが早くなる率が高い
    • ファイルに書き出されたデータを読みだそうとすると、Readで常にPENDINGが返ってくるようになる
    • PENDINGが返ってくる場合は、SingleReader,ConcurrentReader,ReadCompletionCallbackを適切に実装する必要がある
      • SingleReader,ConcurrentReaderでdstに値を入れ、ReadCompletionCallbackのctxに、outputで渡された値を入れる
      • オンメモリに載らないデータを扱う場合、Functionsの実装は必須
  • FasterKV.EntryCountは重い処理とみなした方が良い

オブジェクトストレージについて

  • Blittableでなく、かつstructでもないオブジェクトに関しては、IObjectSerializerが使われる
  • 実データはObjectDeviceで指定した領域に保存される
  • 本体ログの方には開始点のポインタのみが記録される
  • IObjectSerializerの意味は以下の通り
    • BeginSerialize(Stream stm)
      • Serialize開始前に渡されるデータストリーム
      • このstmに対してバイナリデータを書き込む
    • Serialize(ref T obj)
      • バイナリデータをBeginSerializeで渡されたストリームに書き込む
      • オフセットは調整済みなので、うかつに動かさないこと
    • EndSerialize()
      • Serialize後に呼ばれる
      • Begin,Serializeで作成したオブジェクト等の後始末を行う

パラメーター

  • パラメーターについて(https://microsoft.github.io/FASTER/tuning/Log.html)
    • PageSizeBits
      • 1ページ辺りのビット数
      • 小さいと空間効率が悪くなり、ディスク行きになるデータが多くなる
      • 可能な限り増やすべきだが、そうすると後述のMemoryBitsを増やさなければならなくなる
      • 増やしすぎるとリカバリ等で致命的なダメージになりやすいかもしれない
    • MemoryBits
      • オンメモリで持っておくページの最大サイズ
      • FASTERでは最低でも16ページが保持できることを期待しているため、PageSizeBits + 4 <= MemoryBitsでなければならない
      • 許す限り大きな値を取っておくのが吉
    • SegmentBits
      • ディスクに保存するログファイルのサイズ
      • メモリに乗り切らないログデータがディスクに書き出される
      • ディスクに書き出されるときは常に2^SegmentBitsバイトになる
      • MemoryBitsと同じサイズにすれば大体間違いない
      • 大きくしすぎると、ログファイル作成時やFlush時に時間がかかり、後述のチェックポイント作成に時間がかかる場合がある
      • とはいえ、小さくしすぎると膨大な数のファイルを作成するため、やはりチェックポイント作成に時間がかかる

データの永続化

共通

  • データはデフォルトで永続化されない
    • 明示的にCheckpointを作成しなければならない
    • チェックポイントにはIndexCheckpointとHybridLogCheckpointが存在
    • HybridLogCheckpointの方に実データが存在
    • IndexCheckpointの方は、ht.datという名前でIndexSize * 64Bytesのファイルが作成されるので注意
  • ReadCacheSettingを設定していた場合、HybridLogのチェックポイントのみ作成可能
  • どのチェックポイントから復元するかは、チェックポイント作成時にGUIDを渡されるのでそこから判断する
  • ローリングしたい場合はCheckpointCompletionCallbackでどうこうする?
    • でも自身のチェックポイントIDがわからないので問題はある
  • チェックポイント作成中は他のスレッドは妨害されないので、性能低下は無いらしいが、実際はどうなのか

Snapshot

  • snapshotsを選択した場合、HybridLogCheckPointにはメタ情報+フラッシュされた全データが記録される
    • DBフルバックアップと同じ
  • 場合によっては数秒以上かかるので、それなりに重い処理
  • 毎回データ内容をフルダンプするため、ディスク効率は悪い

FoldOver

  • ログをフラッシュし、その後チェックポイントにはメタデータ情報のみを記録
  • Snapshotより短時間で完了可能(ログFlushの時間次第)
  • ログデータに不整合が発生した場合、チェックポイントが機能しない可能性がある
  • Compactによっても機能しなくなる可能性がある
  • 整合性よりは、継続性を重視したい(停止→開始前後の処理時間を短くしたい、チェックポイントを頻繁に出力したい)場合に使う

その他

  • C#版はデフォルト実装はWindows(x86系)のみ対応
    • dotnet core及びnet46をサポートして、プラットフォームの制約は無くなった
  • ユーザー定義処理をオーバーヘッド無しで行うため、C#版は初回起動時に対象データ構造に合わせた処理をコード生成している
    • Unsafeコードで代替
  • C++版はクロスプラットフォーム(x86系のみ)対応

FasterLogについて

FasterKVと同一のオブジェクトストレージを利用した、ログ格納と読出しに特化したデータ構造。 FasterKVに比べて準備は少ないので、始めるには良い。

  • LogDeviceにdeleteOnCloseフラグを付けると、若干挙動が怪しくなるので注意→用途的に使う場面は考えられない?
  • 扱えるデータ型はbyte[]のみ
  • 追加はEnque()、検索はScan()削除はTruncateUntil()となる
    • 削除は、指定アドレスまで切り詰め処理を行うTruncateUntilのみ
  • Enque()で使えるIReadOnlySpanBatchの用途は不明
  • Scanで返ってくる要素は直接列挙はできないため、GetAsyncEnumerable()IAsyncEnumerableを取得して処理を行う
  • 削除で下手な値を指定すると、次のScan時に例外が起きる
    • 2のべき乗なら大体安心か?
  • 検索、削除で下手なアドレスを指定すると、例外が起きたりする
  • 検索で終端アドレスにlong.MaxValue等の大きい値を指定すると、処理が返ってこなくなる
    • ドキュメントでは特に言及されていないため、バグか使い方を間違えている可能性がある
    • TailAddressCommittedUntilAddressを終端アドレスとして指定すると良い

参考リンク

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using FASTER.core;
using System.IO;
using System.Linq;
namespace fasterlabs
{
static class FasterLogCommitTest
{
static long EnqueueData(long value, FasterLog fl)
{
Span<long> data = stackalloc long[1];
if (fl.TryEnqueue(System.Runtime.InteropServices.MemoryMarshal.AsBytes(data), out var logicalAddress))
{
return logicalAddress;
}
else
{
return -1;
}
}
public static async ValueTask DoSingleThread(int commitInterval)
{
const long TotalCount = 100000;
// ensure using cleared data
if (File.Exists("logcommittest.log.0"))
{
File.Delete("logcommittest.log.0");
}
if (File.Exists("logcommittest.log.commit"))
{
File.Delete("logcommittest.log.commit");
}
var log = Devices.CreateLogDevice("logcommittest.log");
var channel = Channel.CreateUnbounded<long>();
using (var fl = new FasterLog(new FasterLogSettings() { LogDevice = log }))
{
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
using (var csrc = new CancellationTokenSource(1000 * 240))
{
for(int i = 0;i<TotalCount;i++)
{
EnqueueData(i, fl);
if(i % commitInterval == commitInterval - 1)
{
fl.Commit(true);
// Console.WriteLine($"Commit: {i}");
}
}
if(fl.CommittedUntilAddress != fl.TailAddress)
{
await fl.CommitAsync(csrc.Token);
}
sw.Stop();
Console.WriteLine($"Single({TotalCount}, {commitInterval}): {sw.Elapsed}, iops = {(TotalCount * 1000) / sw.Elapsed.TotalMilliseconds}");
}
}
log.Close();
}
public static async ValueTask DoTest(int TaskNum)
{
const long TotalCount = 100000;
// ensure using cleared data
if (File.Exists("logcommittest.log.0"))
{
File.Delete("logcommittest.log.0");
}
if (File.Exists("logcommittest.log.commit"))
{
File.Delete("logcommittest.log.commit");
}
var log = Devices.CreateLogDevice("logcommittest.log");
var channel = Channel.CreateUnbounded<long>();
using (var fl = new FasterLog(new FasterLogSettings() { LogDevice = log }))
{
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
using (var csrc = new CancellationTokenSource(1000 * 240))
{
await Task.WhenAll(
Task.WhenAll(Enumerable.Range(0, TaskNum).Select(async idx =>
{
long logicalAddress = 0;
try
{
for (int i = 0; i < TotalCount / TaskNum; i++)
{
logicalAddress = EnqueueData(i + idx * TotalCount, fl);
await channel.Writer.WriteAsync(logicalAddress, csrc.Token).ConfigureAwait(false);
await fl.WaitForCommitAsync(logicalAddress, csrc.Token).ConfigureAwait(false);
// Console.WriteLine($"{idx}, {i}, {logicalAddress}");
}
}
catch (Exception e)
{
Console.WriteLine($"producer error({idx}, {logicalAddress}, {fl.CommittedUntilAddress}, {fl.TailAddress}): {e}");
}
// Console.WriteLine($"exit producer({idx}, {sw.Elapsed})");
})).ContinueWith(t => channel.Writer.Complete()),
Task.Run(async () =>
{
int commitCount = 0;
try
{
while (true)
{
if (!await channel.Reader.WaitToReadAsync(csrc.Token).ConfigureAwait(false))
{
break;
}
while (channel.Reader.TryRead(out var untiladdr))
{
if (fl.CommittedUntilAddress <= untiladdr && fl.CommittedUntilAddress != fl.TailAddress)
{
fl.Commit(true);
// await fl.CommitAsync(csrc.Token).ConfigureAwait(false);
commitCount++;
}
}
}
}
catch (Exception e)
{
Console.WriteLine($"consumer error:{e}");
}
Console.WriteLine($"exit consumer({commitCount})");
}).ContinueWith(t =>
{
if(fl.CommittedUntilAddress != fl.TailAddress)
{
fl.Commit(true);
}
})
).ConfigureAwait(false);
sw.Stop();
Console.WriteLine($"Multi({TotalCount}, {TaskNum}): {sw.Elapsed}, iops = {(TotalCount * 1000) / sw.Elapsed.TotalMilliseconds}");
}
}
log.Close();
}
}
}
exit consumer(18203)
Multi(100000, 10): 00:00:14.7075809, iops = 6799.214682545108
Single(100000, 10): 00:00:08.1340570, iops = 12293.988104582988
exit consumer(1999)
Multi(100000, 100): 00:00:01.8739873, iops = 53362.15458877443
Single(100000, 100): 00:00:00.8008728, iops = 124863.77362297733
exit consumer(197)
Multi(100000, 1000): 00:00:00.2690126, iops = 371729.80001680215
Single(100000, 1000): 00:00:00.1101271, iops = 908041.7081717397
exit consumer(44)
Multi(100000, 10000): 00:00:00.9875769, iops = 101257.93748314687
Single(100000, 10000): 00:00:00.0502674, iops = 1989360.8979179347
namespace fasterlabs
{
static class Test1
{
public static async Task FasterLogTest()
{
for (int idx = 0; idx < 4; idx++)
{
Console.WriteLine($"begin iteration {idx}");
var log = Devices.CreateLogDevice("hoge.db");
var flogsetting = new FasterLogSettings()
{
LogDevice = log,
SegmentSizeBits = 22,
MemorySizeBits = 20,
PageSizeBits = 10
};
Console.WriteLine($"{flogsetting.MemorySizeBits}, {flogsetting.PageSizeBits}, {flogsetting.SegmentSizeBits}");
using (var fl = new FasterLog(flogsetting))
{
var data = new byte[128];
long truncateaddr = 0;
// inserting data
for (int i = 0; i < 129; i++)
{
data.AsSpan().Fill((byte)i);
var l = fl.Enqueue(data);
if(i == 64)
{
truncateaddr = l;
}
}
fl.Commit(true);
// await fl.CommitAsync().ConfigureAwait(false);
// output address info
Console.WriteLine($"begin = {fl.BeginAddress}, committed begin = {fl.CommittedBeginAddress}, committed until = {fl.CommittedUntilAddress}, {fl.FlushedUntilAddress}, tail = {fl.TailAddress}");
// counting log 1st
using (var iter = fl.Scan(fl.BeginAddress, fl.TailAddress))
{
long cnt = 0;
await foreach (var (d, i) in iter.GetAsyncEnumerable())
{
Console.WriteLine($"data{d[0]}");
cnt += 1;
}
Console.WriteLine($"first: log num is {cnt}");
}
// truncating log
Console.WriteLine($"truncate until {truncateaddr}");
fl.TruncateUntil(truncateaddr);
fl.Commit(true);
// output address info
Console.WriteLine($"begin = {fl.BeginAddress}, committed begin = {fl.CommittedBeginAddress}, committed until = {fl.CommittedUntilAddress}, {fl.FlushedUntilAddress}, tail = {fl.TailAddress}");
// counting log 2nd
using (var iter = fl.Scan(0, fl.TailAddress))
{
long cnt = 0;
await foreach (var (d, i) in iter.GetAsyncEnumerable())
{
// Console.WriteLine($"{d[0]}, {d.Length}, {i}");
cnt += 1;
}
Console.WriteLine($"second: log num is {cnt}");
}
Console.ReadLine();
}
log.Close();
}
}
}
}
using FASTER.core;
using System.Text;
using System;
using System.IO;
using System.Threading.Tasks;
using System.Threading;
using StackExchange.Profiling;
using Microsoft.Extensions.DependencyInjection;
namespace litedbtest
{
class CompletionContext<T>
{
public T Value;
public bool Completed;
}
class Funcs : FunctionsBase<long, long, long, long, CompletionContext<long>>
{
// if data is in disk, *Reader and ReadCompletionCallback should be implemented
public override void SingleReader(ref long key, ref long input, ref long value, ref long dst)
{
dst = value;
}
public override void ConcurrentReader(ref long key, ref long input, ref long value, ref long dst)
{
dst = value;
}
public override void ReadCompletionCallback(ref long key, ref long input, ref long output, CompletionContext<long> ctx, Status status)
{
ctx.Value = output;
output = ctx.Value;
ctx.Completed = true;
}
}
class FasterTest
{
class MyEqualityComparer : IFasterEqualityComparer<long>
{
public bool Equals(ref long k1, ref long k2)
{
return k1 == k2;
}
public long GetHashCode64(ref long k)
{
return k;
}
}
static string GetTimingTreeString(Timing t, int depth)
{
var sb = new StringBuilder();
var indent = new string(' ', depth * 2);
if (depth > 100)
{
throw new Exception($"too deep({depth})");
}
if (t.HasChildren)
{
sb.AppendLine($"{indent}{t.Name}: {t.DurationMilliseconds},{t.DurationWithoutChildrenMilliseconds}");
}
else
{
sb.AppendLine($"{indent}{t.Name}: {t.DurationMilliseconds}");
}
if (t.HasChildren)
{
foreach (var child in t.Children)
{
sb.Append(GetTimingTreeString(child, depth + 1));
}
}
return sb.ToString();
}
public static void TestContinue(int memoryBits, int pageBits, int segmentBits)
{
const string checkpointDir = "checkpoints";
string logPath = Path.Combine("fasterlog", "faster.log");
if (Directory.Exists(logPath))
{
Directory.Delete(logPath);
}
Directory.CreateDirectory(Path.GetDirectoryName(logPath));
if (Directory.Exists(checkpointDir))
{
Directory.Delete(checkpointDir, true);
}
var logDevice = Devices.CreateLogDevice(logPath, deleteOnClose: true);
try
{
var logSettings = new LogSettings()
{
LogDevice = logDevice,
MemorySizeBits = memoryBits,
PageSizeBits = pageBits,
SegmentSizeBits = segmentBits
};
var checkPointSetting = new CheckpointSettings()
{
CheckPointType = CheckpointType.Snapshot,
CheckpointDir = checkpointDir
};
var faster = new FasterKV<long, long, long, long, CompletionContext<long>, Funcs>(1 << 15,
new Funcs(), logSettings, checkpointSettings: checkPointSetting, comparer: new MyEqualityComparer());
Guid session;
Guid checkpoint;
var profiler = MiniProfiler.StartNew($"faster_{memoryBits}_{pageBits}_{segmentBits}");
using (profiler.Step("insert"))
{
session = faster.StartSession();
var ctx = new CompletionContext<long>();
using (profiler.Step("upsert"))
{
for (int loop = 0; loop < 10; loop++)
{
for (int i = 0; i < 10000; i++)
{
long key = i;
long value = i * 3 + loop * 1000;
var st = faster.Upsert(ref key, ref value, ctx, loop * 10000 + i);
}
}
}
using (profiler.Step("compaction"))
{
faster.Log.Compact(faster.Log.TailAddress);
}
using (profiler.Step("checkpoint"))
{
faster.TakeFullCheckpoint(out checkpoint);
faster.CompleteCheckpoint(true);
}
faster.StopSession();
}
using (profiler.Step("continue"))
{
using (profiler.Step("checkpoint"))
{
faster.Recover(checkpoint);
}
var monotonic = faster.ContinueSession(session);
faster.StopSession();
}
profiler.Stop();
Console.WriteLine(GetTimingTreeString(profiler.Root, 0));
faster.Dispose();
}
finally
{
logDevice.Close();
}
}
public static void TestBasic()
{
foreach (var fi in new DirectoryInfo(".").GetFiles("faster.log*"))
{
fi.Delete();
}
var logDevice = Devices.CreateLogDevice("./faster.log");
var checkpointSetting = new CheckpointSettings()
{
CheckpointDir = "checkpoints"
};
var readCacheSettings = new ReadCacheSettings()
{
MemorySizeBits = 20,
PageSizeBits = 8,
};
var logSettings = new LogSettings()
{
LogDevice = logDevice,
MemorySizeBits = 20,
PageSizeBits = 8,
SegmentSizeBits = 20,
ReadCacheSettings = readCacheSettings
};
using (var faster = new FasterKV<long, long, long, long, CompletionContext<long>, Funcs>(1 << 20,
new Funcs(),
logSettings,
checkpointSettings: checkpointSetting))
{
var guid = faster.StartSession();
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
var ctx = new CompletionContext<long>();
for (int i = 0; i < 1; i++)
{
for (int j = 0; j < 100000; j++)
{
var key = (long)j;
var value = (long)(i * 10000 + j);
var st = faster.Upsert(ref key, ref value, ctx, 0);
if (st != Status.OK)
{
Console.WriteLine($"upserting failed,{st},{key},{value},{i}");
}
}
}
sw.Stop();
using (var iter = faster.Log.Scan(faster.Log.BeginAddress, faster.Log.TailAddress))
{
while (iter.GetNext(out var recordInfo, out var key, out var value))
{
}
}
Console.WriteLine($"{sw.Elapsed}");
for (int i = 0; i < 100; i++)
{
long key = i % 10;
long value = 0;
long input = 0;
var st = faster.Read(ref key, ref input, ref value, ctx, 0);
if (st == Status.OK)
{
// Console.WriteLine($"OK: {key},{input},{value}");
}
else if (st == Status.PENDING)
{
faster.CompletePending(true);
// Console.WriteLine($"Pending: {key},{input},{value},{ctx.Value},{ctx.Completed}");
ctx.Completed = false;
}
}
Console.WriteLine($"{faster.DumpDistribution()}");
faster.Log.Compact(faster.Log.TailAddress);
faster.TakeHybridLogCheckpoint(out var token);
faster.CompleteCheckpoint(true);
Console.WriteLine($"checkpoint id: {token}, session id: {guid}");
faster.StopSession();
}
Console.WriteLine($"{logDevice.FileName}");
logDevice.Close();
}
}
}
FasterThreadTest.cs(14,19): warning CS0649: フィールド 'FasterThreadContext.Status' は割り当てられません。常に既定値 を使用します。 [G:\src\gitrepos\dotnet-sandbox\litedbtest\litedbtest.csproj]
FasterThreadTest.cs(15,17): warning CS0649: フィールド 'FasterThreadContext.Value' は割り当てられません。常に既定値 0 を使用します。 [G:\src\gitrepos\dotnet-sandbox\litedbtest\litedbtest.csproj]
FasterThreadTest.cs(60,9): warning CS0414: フィールド 'FasterThreadChannel.WriteCount' が割り当てられていますが、値は使用されていません。 [G:\src\gitrepos\dotnet-sandbox\litedbtest\litedbtest.csproj]
***WARNING*** Creating default FASTER key equality comparer based on potentially slow EqualityComparer<Key>.Default. To avoid this, provide a comparer (IFasterEqualityComparer<Key>) as an argument to FASTER's constructor, or make Key implement the interface IFasterEqualityComparer<Key>
write done:channelstest_1: 9191.9,-903984.2
childtask_0: 9137
childtask_1: 9132.3
childtask_2: 9132
childtask_3: 9132.7
childtask_4: 9132
childtask_5: 9132.4
childtask_6: 9132.5
childtask_7: 9132.6
childtask_8: 9132.2
childtask_9: 9132.4
childtask_10: 9132.5
childtask_11: 9132
childtask_12: 9132.3
childtask_13: 9132.1
childtask_14: 9132.1
childtask_15: 9132.3
childtask_16: 9132.5
childtask_17: 9132
childtask_18: 9132.2
childtask_19: 9132.7
childtask_20: 9132.7
childtask_21: 9132.7
childtask_22: 9132.2
childtask_23: 9132.4
childtask_24: 9132.3
childtask_25: 9132.1
childtask_26: 9132.4
childtask_27: 9132
childtask_28: 9132.2
childtask_29: 9132.4
childtask_30: 9132
childtask_31: 9132.1
childtask_32: 9132.4
childtask_33: 9131.9
childtask_34: 9131.4
childtask_35: 9131.9
childtask_36: 9131.2
childtask_37: 9131.2
childtask_38: 9131.6
childtask_39: 9131.6
childtask_40: 9131.6
childtask_41: 9131.1
childtask_42: 9131.5
childtask_43: 9131.2
childtask_44: 9131.4
childtask_45: 9131.7
childtask_46: 9131.6
childtask_47: 9131.4
childtask_48: 9131.4
childtask_49: 9131.2
childtask_50: 9131.6
childtask_51: 9131.5
childtask_52: 9131.6
childtask_53: 9131.3
childtask_54: 9131.5
childtask_55: 9131.1
childtask_56: 9131.6
childtask_57: 9131.3
childtask_58: 9131.1
childtask_59: 9131.6
childtask_60: 9131.6
childtask_61: 9131.6
childtask_62: 9131.7
childtask_63: 9131.3
childtask_64: 9131.6
childtask_65: 9131.5
childtask_66: 9131.7
childtask_67: 9131.1
childtask_68: 9131.1
childtask_69: 9131.1
childtask_70: 9131.8
childtask_71: 9131.5
childtask_72: 9131.6
childtask_73: 9131.3
childtask_74: 9131.1
childtask_75: 9131.1
childtask_76: 9131.3
childtask_77: 9131.6
childtask_78: 9131.1
childtask_79: 9131.6
childtask_80: 9131.2
childtask_81: 9131.1
childtask_82: 9131.3
childtask_83: 9131.6
childtask_84: 9131.1
childtask_85: 9131.6
childtask_86: 9131.4
childtask_87: 9131.6
childtask_88: 9131.5
childtask_89: 9131.6
childtask_90: 9131.1
childtask_91: 9131.7
childtask_92: 9131.3
childtask_93: 9131.2
childtask_94: 9131.4
childtask_95: 9131.4
childtask_96: 9131.5
childtask_97: 9131.3
childtask_98: 9131.3
childtask_99: 9131.8
***WARNING*** Creating default FASTER key equality comparer based on potentially slow EqualityComparer<Key>.Default. To avoid this, provide a comparer (IFasterEqualityComparer<Key>) as an argument to FASTER's constructor, or make Key implement the interface IFasterEqualityComparer<Key>
write done:channelstest_10: 4074.6,-398979.3
childtask_0: 4028.9
childtask_1: 4046.3
childtask_2: 4000.4
childtask_3: 4024.5
childtask_4: 4043.8
childtask_5: 4042.9
childtask_6: 4037
childtask_7: 4044
childtask_8: 4012.6
childtask_9: 4043.1
childtask_10: 4026.4
childtask_11: 4008
childtask_12: 4015.7
childtask_13: 4043.5
childtask_14: 4035.5
childtask_15: 4031.2
childtask_16: 4041.6
childtask_17: 4025.7
childtask_18: 4010
childtask_19: 4036
childtask_20: 4031.6
childtask_21: 4040.8
childtask_22: 4006.3
childtask_23: 4026.5
childtask_24: 4033.3
childtask_25: 4043.4
childtask_26: 4020.4
childtask_27: 4033.8
childtask_28: 4038.3
childtask_29: 4022
childtask_30: 4038.6
childtask_31: 4003.8
childtask_32: 4030
childtask_33: 4032.2
childtask_34: 4041.4
childtask_35: 4043.3
childtask_36: 4039.2
childtask_37: 4000.8
childtask_38: 4037.4
childtask_39: 4041.6
childtask_40: 4030.2
childtask_41: 4040.1
childtask_42: 4036.1
childtask_43: 4039.5
childtask_44: 4038.6
childtask_45: 4039.6
childtask_46: 4038.2
childtask_47: 4024.8
childtask_48: 4032.3
childtask_49: 4020
childtask_50: 4018.1
childtask_51: 4040.6
childtask_52: 4040.5
childtask_53: 4008.5
childtask_54: 4036.6
childtask_55: 4026.4
childtask_56: 4037.3
childtask_57: 4040
childtask_58: 4009
childtask_59: 4040.2
childtask_60: 4037.6
childtask_61: 4028.3
childtask_62: 4038.3
childtask_63: 4032
childtask_64: 4041.4
childtask_65: 4032.7
childtask_66: 4018.4
childtask_67: 4016.3
childtask_68: 4032.9
childtask_69: 4024.9
childtask_70: 4013.3
childtask_71: 4015.6
childtask_72: 4007.9
childtask_73: 4039.9
childtask_74: 4043.1
childtask_75: 4021
childtask_76: 4037.4
childtask_77: 4038.8
childtask_78: 4043
childtask_79: 4034.4
childtask_80: 4035.7
childtask_81: 4036.7
childtask_82: 4032.7
childtask_83: 4006
childtask_84: 4017.4
childtask_85: 4041.8
childtask_86: 4041.8
childtask_87: 4034.4
childtask_88: 4029.3
childtask_89: 4030.4
childtask_90: 4030
childtask_91: 4035
childtask_92: 4009.6
childtask_93: 4044.6
childtask_94: 4036.5
childtask_95: 4027.4
childtask_96: 4023.3
childtask_97: 4029.7
childtask_98: 4037.3
childtask_99: 4038.7
using FASTER.core;
using System.Threading.Tasks;
using System;
using System.Text;
using System.Linq;
using System.IO;
using System.Threading.Channels;
using System.Threading;
using System.Collections.Concurrent;
using StackExchange.Profiling;
class FasterThreadContext
{
public Status Status;
public long Value;
}
class FasterThreadFuncs : FunctionsBase<long, long, long, long, FasterThreadContext>
{
}
class FasterThreadChannel : IDisposable
{
public FasterThreadChannel(string filePath, int threadNum)
{
_Device = Devices.CreateLogDevice(filePath);
_CancellationTokenSource = new CancellationTokenSource();
var opts = new UnboundedChannelOptions()
{
SingleReader = false
};
_Queue = new BlockingCollection<(long, long, int, TaskCompletionSource<(long, bool)>)>();
_Faster = new FasterKV<long, long, long, long, FasterThreadContext, FasterThreadFuncs>(1 << 20, new FasterThreadFuncs(), new LogSettings() { LogDevice = _Device });
_Threads = Enumerable.Range(0, threadNum).Select(x =>
{
var th = new Thread(Worker);
th.Start();
return th;
}).ToArray();
_CompactionThread = new Thread(CompactionWorker);
_CompactionThread.Start();
}
public Task<(long, bool)> WriteAsync(long key, long value)
{
var tcs = new TaskCompletionSource<(long, bool)>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!_Queue.TryAdd((key, value, 1, tcs)))
{
throw new Exception("failed to add queue");
}
return tcs.Task;
}
public Task<(long, bool)> ReadAsync(long key)
{
var tcs = new TaskCompletionSource<(long, bool)>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!_Queue.TryAdd((key, 0, 0, tcs)))
{
throw new Exception("failed to add queue");
}
return tcs.Task;
}
int WriteCount = 0;
Thread[] _Threads;
Thread _CompactionThread;
BlockingCollection<(long key, long value, int type, TaskCompletionSource<(long, bool)> completion)> _Queue;
FasterKV<long, long, long, long, FasterThreadContext, FasterThreadFuncs> _Faster;
IDevice _Device;
CancellationToken _Cancellation => _CancellationTokenSource.Token;
CancellationTokenSource _CancellationTokenSource;
public void CompactionWorker()
{
_Faster.StartSession();
while(!_Cancellation.WaitHandle.WaitOne(1000))
{
_Faster.Log.Compact(_Faster.Log.TailAddress);
}
_Faster.StopSession();
}
public void Worker()
{
_Faster.StartSession();
try
{
var ctx = new FasterThreadContext();
while (!_Cancellation.IsCancellationRequested)
{
try
{
foreach (var queueElement in _Queue.GetConsumingEnumerable(_Cancellation))
{
long input = 0;
switch (queueElement.type)
{
case 0:
{
Console.WriteLine($"begin read");
long key = queueElement.key;
long value = queueElement.value;
var st = _Faster.Read(ref key, ref input, ref value, ctx, 0);
if (st == Status.PENDING)
{
if (!_Faster.CompletePending(true))
{
queueElement.completion.TrySetException(new Exception("waiting pending failed in read"));
}
else if (ctx.Status == Status.OK)
{
queueElement.completion.TrySetResult((ctx.Value, true));
}
else
{
queueElement.completion.TrySetResult((0, false));
}
}
else
{
queueElement.completion.TrySetResult((value, true));
}
}
break;
case 1:
{
long key = queueElement.key;
long value = queueElement.value;
var st = _Faster.Upsert(ref key, ref value, ctx, 0);
if (st == Status.PENDING)
{
if (!_Faster.CompletePending(true))
{
queueElement.completion.TrySetException(new Exception("waiting pending failed in upsert"));
}
else
{
queueElement.completion.TrySetResult((0, true));
}
}
else
{
queueElement.completion.TrySetResult((0, true));
}
// while (true)
// {
// var back = WriteCount;
// if (back > 100_000)
// {
// var old = Interlocked.CompareExchange(ref WriteCount, 0, back);
// if (old == back)
// {
// _Faster.Log.Compact(_Faster.Log.TailAddress);
// break;
// }
// }
// else
// {
// var old = Interlocked.CompareExchange(ref WriteCount, back + 1, back);
// if (old == back)
// {
// break;
// }
// }
// }
}
break;
default:
queueElement.completion.TrySetException(new Exception("unknown type value"));
break;
}
}
}
catch (OperationCanceledException)
{
break;
}
// try
// {
// var isSuccess = _Queue.Reader.WaitToReadAsync(_Cancellation).Result;
// if (!isSuccess)
// {
// break;
// }
// }
// catch
// {
// break;
// }
// while (_Queue.Reader.TryRead(out var queueElement))
// {
// long input = 0;
// switch (queueElement.type)
// {
// case 0:
// {
// Console.WriteLine($"begin read");
// var st = _Faster.Read(ref queueElement.key, ref input, ref queueElement.value, ctx, 0);
// if (st == Status.PENDING)
// {
// if (!_Faster.CompletePending(true))
// {
// queueElement.completion.TrySetException(new Exception("waiting pending failed in read"));
// }
// else if (ctx.Status == Status.OK)
// {
// queueElement.completion.TrySetResult((ctx.Value, true));
// }
// else
// {
// queueElement.completion.TrySetResult((0, false));
// }
// }
// else
// {
// queueElement.completion.TrySetResult((queueElement.value, true));
// }
// }
// break;
// case 1:
// {
// Console.WriteLine($"begin write:{queueElement}");
// var st = _Faster.Upsert(ref queueElement.key, ref queueElement.value, ctx, 0);
// if (st == Status.PENDING)
// {
// if (!_Faster.CompletePending(true))
// {
// queueElement.completion.TrySetException(new Exception("waiting pending failed in upsert"));
// }
// else
// {
// queueElement.completion.TrySetResult((0, true));
// }
// }
// else
// {
// queueElement.completion.TrySetResult((0, true));
// }
// while (true)
// {
// var back = WriteCount;
// if (back > 100_000)
// {
// var old = Interlocked.CompareExchange(ref WriteCount, 0, back);
// if (old == back)
// {
// _Faster.Log.Compact(_Faster.Log.TailAddress);
// break;
// }
// }
// else
// {
// var old = Interlocked.CompareExchange(ref WriteCount, back + 1, back);
// if (old == back)
// {
// break;
// }
// }
// }
// Console.WriteLine($"end write");
// }
// break;
// default:
// queueElement.completion.TrySetException(new Exception("unknown type value"));
// break;
// }
// }
}
}
finally
{
_Faster.StopSession();
}
}
#region IDisposable Support
private bool disposedValue = false; // 重複する呼び出しを検出するには
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_CancellationTokenSource.Cancel();
if (_Threads != null)
{
foreach (var th in _Threads)
{
th.Join();
}
_Threads = null;
}
_CompactionThread.Join();
if (_Queue != null)
{
_Queue.CompleteAdding();
foreach (var queueElement in _Queue)
{
queueElement.completion.TrySetCanceled();
}
_Queue.Dispose();
// _Queue.Writer.TryComplete();
// while (_Queue.Reader.TryRead(out var queueElement))
// {
// queueElement.completion.TrySetCanceled();
// }
// _Queue = null;
}
_CancellationTokenSource.Dispose();
_CancellationTokenSource = null;
_Faster.Dispose();
_Device.Close();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
#endregion
}
static class FasterThreadTest
{
static string GetTimingTreeString(Timing t, int depth)
{
var sb = new StringBuilder();
var indent = new string(' ', depth * 2);
if (depth > 100)
{
throw new Exception($"too deep({depth})");
}
if (t.HasChildren)
{
sb.AppendLine($"{indent}{t.Name}: {t.DurationMilliseconds},{t.DurationWithoutChildrenMilliseconds}");
}
else
{
sb.AppendLine($"{indent}{t.Name}: {t.DurationMilliseconds}");
}
if (t.HasChildren)
{
foreach (var child in t.Children)
{
sb.Append(GetTimingTreeString(child, depth + 1));
}
}
return sb.ToString();
}
public static async Task ChannelsTest(int threadNum = 10)
{
var profiler = MiniProfiler.StartNew($"channelstest_{threadNum}");
using (var faster = new FasterThreadChannel("faster.log", threadNum))
{
await Task.WhenAll(Enumerable.Range(0, 100).Select(async taskId =>
{
using (profiler.Step($"childtask_{taskId}"))
{
for (int i = 0; i < 100000; i++)
{
await faster.WriteAsync(i, i).ConfigureAwait(false);
}
}
})).ConfigureAwait(false);
profiler.Stop();
Console.WriteLine($"write done:{GetTimingTreeString(profiler.Root, 0)}");
}
}
}
faster_15_5_15: 15708.2,2.6
insert: 15138.1,11.6
upsert: 14288.1
checkpoint: 838.4
continue: 567.5,2.1
checkpoint: 565.4
faster_15_5_20: 15838.2,0.0
insert: 9488.8,0.1
upsert: 9196.3
checkpoint: 292.4
continue: 6349.4,0.0
checkpoint: 6349.4
faster_20_5_15: 77761,0.1
insert: 12324.2,0.0
upsert: 21.2
checkpoint: 12303
continue: 65436.7,0.0
checkpoint: 65436.7
faster_20_5_20: 36185.8,0.1
insert: 1679.9,0.1
upsert: 23.4
checkpoint: 1656.4
continue: 34505.8,0.0
checkpoint: 34505.8
faster_15_10_15: 1633.2,0.0
insert: 1359.9,0.1
upsert: 1326.2
checkpoint: 33.6
continue: 273.3,0.1
checkpoint: 273.2
faster_15_10_20: 453.4,0.0
insert: 251.4,0.1
upsert: 219.9
checkpoint: 31.4
continue: 202,0.1
checkpoint: 201.9
faster_20_10_15: 510.6,0.0
insert: 263.6,0.1
upsert: 47.3
checkpoint: 216.2
continue: 247,0
checkpoint: 247
faster_20_10_20: 1244.2,0.1
insert: 210.1,0.1
upsert: 26.3
checkpoint: 183.7
continue: 1034,0
checkpoint: 1034
faster_15_5_15: 197304.4,2.4
insert: 196000.7,7.3
upsert: 11877.2
compaction: 183248.9
checkpoint: 867.3
continue: 1301.3,2.0
checkpoint: 1299.3
faster_15_5_20: 34772.7,0.0
insert: 29521.5,0.2
upsert: 7793.9
compaction: 21493.3
checkpoint: 234.1
continue: 5251.2,0.1
checkpoint: 5251.1
faster_20_5_15: 254.2,0.1
insert: 85.8,0.1
upsert: 20
compaction: 39.9
checkpoint: 25.8
continue: 168.3,0.0
checkpoint: 168.3
faster_20_5_20: 390.1,0.1
insert: 119,0.2
upsert: 41.6
compaction: 46.8
checkpoint: 30.4
continue: 271,0
checkpoint: 271
faster_15_10_15: 1150.3,0.1
insert: 1095.2,0.2
upsert: 378.6
compaction: 661.2
checkpoint: 55.2
continue: 55,0
checkpoint: 55
faster_15_10_20: 3146.7,0.1
insert: 3097.5,0.2
upsert: 238.7
compaction: 2812.2
checkpoint: 46.4
continue: 49.1,0.0
checkpoint: 49.1
faster_20_10_15: 202.9,0.1
insert: 127.4,0.2
upsert: 27.6
compaction: 61.3
checkpoint: 38.3
continue: 75.4,0.0
checkpoint: 75.4
faster_20_10_20: 150.9,0.1
insert: 74.3,0.1
upsert: 21.9
compaction: 24.3
checkpoint: 28
continue: 76.5,0.0
checkpoint: 76.5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment