Skip to content

Instantly share code, notes, and snippets.

@LeeCampbell LeeCampbell/IRunnable.cs
Last active Jul 29, 2016

Embed
What would you like to do?
SerialDisposable Perf Tests
namespace RxPerformanceTest.SerialDisposable.Console
{
interface IRunnable
{
ThroughputTestResult[] Run();
}
}
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Rx-Core" version="2.3.0-beta2" targetFramework="net46" />
<package id="Rx-Interfaces" version="2.3.0-beta2" targetFramework="net46" />
</packages>
using System;
using System.Management;
namespace RxPerformanceTest.SerialDisposable.Console
{
class Program
{
static void Main(string[] args)
{
ThroughputTester.Run();
}
public static void Clean()
{
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
GC.WaitForPendingFinalizers();
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
}
//From http://stackoverflow.com/questions/340359/how-can-i-get-the-cpu-information-in-net
public static string GetProcessorName()
{
using (ManagementObjectSearcher win32Proc = new ManagementObjectSearcher("select * from Win32_Processor"),
win32CompSys = new ManagementObjectSearcher("select * from Win32_ComputerSystem"),
win32Memory = new ManagementObjectSearcher("select * from Win32_PhysicalMemory"))
{
foreach (ManagementObject obj in win32Proc.Get())
{
var procName = obj["Name"].ToString().ToUpper();
return procName.Replace("INTEL", string.Empty)
.Replace("CORE", string.Empty)
.Replace("CPU", string.Empty)
//.Replace("@", string.Empty)
.Replace(" ", string.Empty)
.Replace("(R)", string.Empty)
.Replace("(TM)", string.Empty);
}
throw new InvalidOperationException();
}
}
}
}
using System;
using System.Reactive.Disposables;
using System.Threading;
namespace RxPerformanceTest.SerialDisposable.Console
{
public sealed class SerialDisposableImplSwap : ICancelable
{
private ISerialCancelable _current = new ActiveSerialCancelable();
public SerialDisposableImplSwap()
{ }
public bool IsDisposed => _current.IsDisposed;
/// <summary>
/// Gets or sets the underlying disposable.
/// </summary>
/// <remarks>If the SerialDisposable has already been disposed, assignment to this property causes immediate disposal of the given disposable object. Assigning this property disposes the previous disposable object.</remarks>
public IDisposable Disposable
{
get { return _current.Disposable; }
set { _current.Disposable = value; }
}
/// <summary>
/// Disposes the underlying disposable as well as all future replacements.
/// </summary>
public void Dispose()
{
var old = Interlocked.Exchange(ref _current, DisposedSerialCancelable.Instance);
old.Dispose();
}
private interface ISerialCancelable : ICancelable
{
IDisposable Disposable { get; set; }
}
private sealed class DisposedSerialCancelable : ISerialCancelable
{
public static readonly DisposedSerialCancelable Instance = new DisposedSerialCancelable();
private DisposedSerialCancelable()
{ }
public bool IsDisposed => true;
public IDisposable Disposable
{
get { return null; }
set { value?.Dispose(); }
}
public void Dispose()
{ }
}
private sealed class ActiveSerialCancelable : ISerialCancelable
{
private IDisposable _disposable;
public bool IsDisposed => false;
public IDisposable Disposable
{
get { return _disposable; }
set
{
var old = Interlocked.Exchange(ref _disposable, value);
old?.Dispose();
}
}
public void Dispose()
{
_disposable?.Dispose();
}
}
}
}
using System;
using System.Reactive.Disposables;
namespace RxPerformanceTest.SerialDisposable.Console
{
//Used just as a proof to show that the naive implementation is fast but not thread safe -LC
// Results will show an invalid operations per second.
public sealed class SerialDisposableUnsafe : ICancelable
{
private IDisposable _current;
public SerialDisposableUnsafe()
{
}
public bool IsDisposed { get; private set; }
/// <summary>
/// Gets or sets the underlying disposable.
/// </summary>
/// <remarks>If the SerialDisposable has already been disposed, assignment to this property causes immediate disposal of the given disposable object. Assigning this property disposes the previous disposable object.</remarks>
public IDisposable Disposable
{
get
{
return _current;
}
set
{
if (IsDisposed)
{
value?.Dispose();
}
else
{
var previous = _current;
_current = value;
previous?.Dispose();
}
}
}
/// <summary>
/// Disposes the underlying disposable as well as all future replacements.
/// </summary>
public void Dispose()
{
IsDisposed = true;
_current?.Dispose();
}
}
}
using System;
using System.Reactive.Disposables;
namespace RxPerformanceTest.SerialDisposable.Console
{
//Like the Thread unsafe version, this shows the just using the volatile keyword will not solve any real problem. -LC
public sealed class SerialDisposableVolatile : ICancelable
{
private volatile IDisposable _current;
public SerialDisposableVolatile()
{
}
public bool IsDisposed { get; private set; }
/// <summary>
/// Gets or sets the underlying disposable.
/// </summary>
/// <remarks>If the SerialDisposable has already been disposed, assignment to this property causes immediate disposal of the given disposable object. Assigning this property disposes the previous disposable object.</remarks>
public IDisposable Disposable
{
get
{
return _current;
}
set
{
if (IsDisposed)
{
value?.Dispose();
}
else
{
var previous = _current;
_current = value;
previous?.Dispose();
}
}
}
/// <summary>
/// Disposes the underlying disposable as well as all future replacements.
/// </summary>
public void Dispose()
{
IsDisposed = true;
_current?.Dispose();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Threading.Tasks;
namespace RxPerformanceTest.SerialDisposable.Console
{
internal class SerialThroughputTest<T> : IRunnable
where T : ICancelable
{
private const int RunSize = 10 * 1000 * 1000;
private readonly Func<T> _serialDisposableFactory;
private readonly Action<T, IDisposable> _assign;
public SerialThroughputTest(Func<T> serialDisposableFactory, Action<T, IDisposable> assign)
{
_serialDisposableFactory = serialDisposableFactory;
_assign = assign;
}
public ThroughputTestResult[] Run()
{
return ExecuteTests().ToArray();
}
private IEnumerable<ThroughputTestResult> ExecuteTests()
{
yield return RunSynchronously();
int maxParallelism = 2;
do
{
yield return RunConcurrently(maxParallelism++);
} while (maxParallelism <= Environment.ProcessorCount + 1);
}
private ThroughputTestResult RunSynchronously()
{
var messages = CreateMessages();
var sut = _serialDisposableFactory();
Program.Clean();
var result = new ThroughputTestResult(1, RunSize);
foreach (var item in messages)
{
_assign(sut, item);
}
sut.Dispose();
result.Dispose();
System.Console.WriteLine($"RunSynchronously Elapsed {result.Elapsed.TotalSeconds}sec");
if (messages.Any(b => !b.IsDisposed))
{
System.Console.WriteLine($"{sut.GetType().Name} operated incorrectly. There are still {messages.Count(b => !b.IsDisposed)} objects not disposed.");
return ThroughputTestResult.InvalidResult(1, RunSize);
}
return result;
}
private ThroughputTestResult RunConcurrently(int threads)
{
var messages = CreateMessages();
var sut = _serialDisposableFactory();
Program.Clean();
var result = new ThroughputTestResult(threads, RunSize);
Parallel.ForEach(
messages,
new ParallelOptions { MaxDegreeOfParallelism = threads },
(item, state, idx) => _assign(sut, item));
sut.Dispose();
result.Dispose();
System.Console.WriteLine($"RunConcurrently({threads}) Elapsed {result.Elapsed.TotalSeconds}sec");
if (messages.Any(b => !b.IsDisposed))
{
System.Console.WriteLine($"{sut.GetType().Name} operated incorrectly. There are still {messages.Count(b => !b.IsDisposed)} objects not disposed.");
return ThroughputTestResult.InvalidResult(threads, RunSize);
}
return result;
}
private static BooleanDisposable[] CreateMessages()
{
var messages = new BooleanDisposable[RunSize];
for (int i = 0; i < RunSize; i++)
{
messages[i] = new BooleanDisposable();
}
return messages;
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
namespace RxPerformanceTest.SerialDisposable.Console
{
public static class ThroughputTester
{
private static readonly Dictionary<string, IRunnable> TestCandidates = new Dictionary<string, IRunnable>
{
{"SerialDisposable", new SerialThroughputTest<System.Reactive.Disposables.SerialDisposable>(()=>new System.Reactive.Disposables.SerialDisposable(), (sut,other)=>{sut.Disposable = other;})},
{"SerialDisposableUnsafe", new SerialThroughputTest<SerialDisposableUnsafe>(()=>new SerialDisposableUnsafe(), (sut,other)=>{sut.Disposable = other;})},
{"SerialDisposableVolatile", new SerialThroughputTest<SerialDisposableVolatile>(()=>new SerialDisposableVolatile(), (sut,other)=>{sut.Disposable = other;})},
{"SerialDisposableImplSwap", new SerialThroughputTest<SerialDisposableImplSwap>(()=>new SerialDisposableImplSwap(), (sut,other)=>{sut.Disposable = other;})},
};
public static void Run()
{
var outputFileName = GenerateFileName();
File.WriteAllText(outputFileName, "Starting test...");
System.Console.WriteLine("Priming...");
var normalColor = System.Console.ForegroundColor;
System.Console.ForegroundColor = ConsoleColor.DarkGray;
foreach (var testCandidate in TestCandidates)
{
System.Console.WriteLine($"Starting prime run for '{testCandidate.Key}' @ {DateTime.Now:o}.");
var t = testCandidate.Value.Run();
var runs = t.ToArray();
System.Console.WriteLine($"Completed prime run for '{testCandidate.Key}' @ {DateTime.Now:o} ({runs.Length} tests).");
}
System.Console.WriteLine("Priming complete.");
System.Console.WriteLine();
System.Console.ForegroundColor = normalColor;
System.Console.WriteLine();
var results = new Dictionary<string, IEnumerable<ThroughputTestResult>>();
foreach (var testCandidate in TestCandidates)
{
System.Console.WriteLine($"Starting main run for '{testCandidate.Key}' @ {DateTime.Now:o}.");
var result = testCandidate.Value.Run();
results[testCandidate.Key] = result.ToArray();
System.Console.WriteLine($"Completed main run for '{testCandidate.Key}' @ {DateTime.Now:o}.");
}
var colHeaders = results.First().Value.Select(tr => tr.Concurrency.ToString()).ToArray();
var rowHeaders = results.OrderByDescending(r => r.Value.Max(x => x.Elapsed)).Select(r => r.Key).ToArray();
var output = ResultsToFixedWdith(
"Concurrency", colHeaders,
"Type", rowHeaders,
(col, row) =>
{
var key = rowHeaders[row];
var vertex = results[key].OrderBy(tr => tr.Concurrency).Skip(col).First();
var opsPerSec = vertex.Messages / vertex.Elapsed.TotalSeconds;
return opsPerSec.ToString("N0");
});
System.Console.WriteLine(output);
System.Console.WriteLine();
File.WriteAllText(outputFileName, output);
System.Console.WriteLine("Results saved to {0}", outputFileName);
System.Console.WriteLine("Test run complete. Press any key to exit.");
}
private static string GenerateFileName()
{
var outputDir = "Results";
var processorName = Program.GetProcessorName();
var fileName = $"SerialDisposableThroughput_on_{processorName}_at_{DateTime.Now:yyyyMMddThhmm}.txt";
if (!Directory.Exists(outputDir))
Directory.CreateDirectory(outputDir);
return Path.Combine(outputDir, fileName);
}
private static string ResultsToFixedWdith(string columnLabel, string[] columnHeaders, string rowLabel, string[] rowHeaders, Func<int, int, string> valueSelector)
{
var maxValueLength = columnHeaders.Max(h => h.Length);
var values = new string[columnHeaders.Length, rowHeaders.Length];
for (int y = 0; y < rowHeaders.Length; y++)
{
for (int x = 0; x < columnHeaders.Length; x++)
{
var value = valueSelector(x, y);
values[x, y] = value;
if (value.Length > maxValueLength) maxValueLength = value.Length;
}
}
var colWidth = maxValueLength + 1;
var labelWidth = rowHeaders.Concat(new[] { rowLabel }).Max(h => h.Length) + 1;
var sb = new StringBuilder();
sb.Append("".PadRight(labelWidth));
sb.Append(columnLabel);
sb.AppendLine();
sb.Append(rowLabel.PadLeft(labelWidth));
foreach (string header in columnHeaders)
{
sb.Append(header.PadLeft(colWidth));
}
sb.AppendLine();
for (int y = 0; y < rowHeaders.Length; y++)
{
sb.Append(rowHeaders[y].PadLeft(labelWidth));
for (int x = 0; x < columnHeaders.Length; x++)
{
sb.Append(valueSelector(x, y).PadLeft(colWidth));
}
sb.AppendLine();
}
return sb.ToString();
}
}
}
using System;
using System.Diagnostics;
namespace RxPerformanceTest.SerialDisposable.Console
{
internal sealed class ThroughputTestResult : IDisposable
{
private readonly Stopwatch _stopwatch = Stopwatch.StartNew();
private readonly int _preRunGen0AllocationCount;
public ThroughputTestResult(int concurrency, int messages)
{
Concurrency = concurrency;
Messages = messages;
_preRunGen0AllocationCount = GC.CollectionCount(0);
}
public int Concurrency { get; private set; }
public int Messages { get; private set; }
public int Gen0Collections { get; private set; }
public TimeSpan Elapsed { get; private set; }
public void Dispose()
{
_stopwatch.Stop();
Gen0Collections = GC.CollectionCount(0) - _preRunGen0AllocationCount;
Elapsed = _stopwatch.Elapsed;
}
public static ThroughputTestResult InvalidResult(int concurrency, int messages)
{
var result = new ThroughputTestResult(concurrency, messages);
result.Dispose();
result.Elapsed = TimeSpan.MaxValue;
return result;
}
}
}
@LeeCampbell

This comment has been minimized.

Copy link
Owner Author

LeeCampbell commented Jul 29, 2016

SerialDisposableThroughput_on_I7-4702HQ@2.20GHZ_at_20160728T1232

i7 Laptop Dell XPS

                         Concurrency
                     Type           1           2           3           4           5           6           7           8           9
   SerialDisposableUnsafe 118,306,233           0           0           0           0           0           0           0           0
 SerialDisposableVolatile 117,524,004           0           0           0           0           0           0           0           0
         SerialDisposable  28,617,871  23,034,421  22,725,852  21,513,149  20,873,898  19,103,423  17,144,292  13,372,363  13,324,482
 SerialDisposableImplSwap  54,099,439  15,125,177  24,327,728  25,465,406  27,842,628  28,974,967  25,071,548  31,648,604  30,005,953
@LeeCampbell

This comment has been minimized.

Copy link
Owner Author

LeeCampbell commented Jul 29, 2016

SerialDisposableThroughput_on_I5-3317U@1.70GHZ_at_20160728T1258

Surface Pro (1) i5

                         Concurrency
                     Type          1          2          3          4          5
   SerialDisposableUnsafe 84,287,823          0          0          0          0
 SerialDisposableVolatile 85,771,385          0          0          0          0
         SerialDisposable 24,225,358 18,297,395 12,991,152  7,997,373  9,938,589
 SerialDisposableImplSwap 46,916,991 21,845,518 25,883,421 22,706,543 27,929,146
@LeeCampbell

This comment has been minimized.

Copy link
Owner Author

LeeCampbell commented Jul 29, 2016

SerialDisposableThroughput_on_ATOMZ2760@1.80GHZ_at_20160729T0908

ACER Iconia W3

                         Concurrency
                     Type          1         2         3         4         5
   SerialDisposableUnsafe 22,193,716         0         0         0         0
 SerialDisposableVolatile 23,074,481         0         0         0         0
         SerialDisposable  6,061,985 3,320,238 2,896,546 2,788,100 2,865,825
 SerialDisposableImplSwap 13,569,915 7,660,569 6,192,081 7,651,061 7,369,214
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.