Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Embedded;
using EventStore.Core.Bus;
using EventStore.Core.Messages;
using HdrHistogram.NET;
using Newtonsoft.Json;
using Notis.Server.Dto;
using NUnit.Framework;
namespace Notis.Server.Tests
{
[TestFixture]
public class EventStoreWritePerfTestFuxture
{
[Test]
public async void ExecuteSerializedWrites()
{
var histogram = GetHistogram();
var events = GenerateEvents().ToList();
var messages = events.Count;
var bytesToWrite = events.Sum(evt => (long)(evt.EventData.Data.Length));
TimeSpan elapsed;
using (var conn = await Connect())
{
//Warm up.
foreach (var evt in events.Take(100))
{
await conn.AppendToStreamAsync(evt.StreamName, evt.ExpectedVersion, evt.EventData);
}
var timer = Stopwatch.StartNew();
foreach (var evt in events)
{
using (histogram.MeasureLatency())
{
await conn.AppendToStreamAsync(evt.StreamName, evt.ExpectedVersion, evt.EventData);
}
}
timer.Stop();
elapsed = timer.Elapsed;
}
var msgPerSec = messages / elapsed.TotalSeconds;
var megabitsWritten = ((bytesToWrite * 8) / (1024 * 1024));
var mbsPerSec = megabitsWritten / elapsed.TotalSeconds;
Console.WriteLine($"{messages:#,###} events written in {elapsed}");
Console.WriteLine($" {msgPerSec:#,###.00} msg/sec");
Console.WriteLine($" {mbsPerSec:#,###.00} mbps");
Console.WriteLine();
Console.WriteLine("--Latency measurements--");
var writer = new StringWriter();
histogram.outputPercentileDistribution(writer);
Console.WriteLine(writer.ToString());
}
private static IEnumerable<Message> GenerateEvents()
{
for (int i = 0; i < 100 * 1000; i++)
{
var payload = CreateStubCommand().ToBson();
var evt = new EventData(Guid.NewGuid(), "DealCreated", true, payload, null);
yield return new Message($"Deal-{evt.EventId}", ExpectedVersion.NoStream, evt);
}
}
private static CreateDealCommand CreateStubCommand()
{
return new CreateDealCommand
{
TradeType = TradeType.Physical,
PricingType = PricingType.Fixed,
TradeDate = DateTimeOffset.Now,
Product = new ReferenceEntity("123", "Soybean oil"),
Direction = TradeDirection.TraderBuysClientSells,
QuantityMeasurment = new QuantityMeasurment(10000, UnitOfMeasure.MetricTon),
CounterpartyInfo = new CounterPartyInfo
{
Counterparty = new ReferenceEntity("123", "CommodityPurchaser"),
MasterServicesAgreement = new ReferenceEntity("MSA-123", "Standard MSA")
},
Shipment = new TradeShipmentTerms
{
ShipmentFromDate = new DateTime(2015, 12, 15),
ShipmentToDate = new DateTime(2016, 01, 15),
Location = new ReferenceEntity("NL", "Netherlands"),
IncoTerms = new ReferenceEntity("FOB", "Free on board")
},
Tolerances = new TradeTolerances
{
MinTolerance = 0.01m,
MaxTolerance = 0.05m,
ToleranceMethod = ValueVariationMethod.Percentage,
Option = ToleranceOption.Buyer,
},
TraderPricing = new PricingParameters
{
PriceCurrency = "USD",
ContractPremium = new ValueAdjustment(2.0m, ValueVariationMethod.Percentage),
TradePricingOption = TraderPricingOption.FixedPrice
},
Payment = new PaymentDetails
{
PaymentCurrency = "USD",
PaymentTerms = new ReferenceEntity("CASH", "Cash")
}
};
}
private static Histogram GetHistogram()
{
return new Histogram((long)(1000000 * 60 * 30), 3); // 1 ns to 30 minutes
}
private static async Task<IEventStoreConnection> Connect()
{
var ipAddress = new IPAddress(new byte[] { 127, 0, 0, 1 });
var defaulEndpoint = new IPEndPoint(ipAddress, 1113);
StartEventStore();
var connectionSettings = ConnectionSettings.Create()
.KeepReconnecting()
.UseDebugLogger();
var conn = EventStoreConnection.Create(connectionSettings, defaulEndpoint);
await conn.ConnectAsync();
return conn;
}
private static void StartEventStore()
{
var timeout = TimeSpan.FromSeconds(3);
var clusterVNode = EmbeddedVNodeBuilder.AsSingleNode()
//.RunInMemory()
.RunProjections(ProjectionsMode.None)
//.EnableDevelopmentMode()
.OnDefaultEndpoints()
.Build();
var startedEvent = new ManualResetEventSlim(false);
clusterVNode.MainBus.Subscribe(new AdHocHandler<SystemMessage.SystemStart>(m => startedEvent.Set()));
clusterVNode.Start();
if (!startedEvent.Wait(timeout))
throw new TimeoutException($"EventStore haven't started in {timeout} seconds.");
}
private sealed class Message
{
public Message(string streamName, int expectedVersion, EventData eventData)
{
StreamName = streamName;
ExpectedVersion = expectedVersion;
EventData = eventData;
}
public string StreamName { get; }
public int ExpectedVersion { get; }
public EventData EventData { get; }
}
}
public static class HistogramExtensions
{
public static IDisposable MeasureLatency(this Histogram histogram)
{
var startTs = Stopwatch.GetTimestamp();
return new ActionDisposable(
() =>
{
histogram.recordValue(Stopwatch.GetTimestamp() - startTs);
});
}
private sealed class ActionDisposable : IDisposable
{
private readonly Action _action;
public ActionDisposable(Action action)
{
_action = action;
}
public void Dispose()
{
_action();
}
}
}
public static class SerializerEx
{
public static byte[] ToBson<T>(this T input)
{
var json = Serialize(input);
return Encoding.UTF8.GetBytes(json);
}
public static string Serialize<T>(this T sut)
{
var json = JsonConvert.SerializeObject(
sut,
Formatting.Indented,
new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
DefaultValueHandling = DefaultValueHandling.Ignore,
});
return json;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.