Last active
August 29, 2015 14:18
-
-
Save kolesnick/fcd83f241cd0bfc843d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Net; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using EventStore.ClientAPI; | |
using EventStore.ClientAPI.SystemData; | |
using Nito.AsyncEx; | |
using Nito.AsyncEx.Synchronous; | |
namespace EventStoreScanner | |
{ | |
internal class Program | |
{ | |
private class Status | |
{ | |
public Status() | |
{ | |
this.StartTime = DateTime.Now; | |
} | |
public DateTime StartTime { get; set; } | |
public int TotalEvents { get; private set; } | |
public int WBEvents { get; set; } | |
public int SystemEvents { get; set; } | |
public int OtherEvents { get; set; } | |
public void Update(ResolvedEvent @event) | |
{ | |
this.TotalEvents++; | |
if (@event.Event.EventStreamId.StartsWith("$")) | |
{ | |
this.SystemEvents++; | |
} | |
else if (@event.Event.EventStreamId.StartsWith("WB")) | |
{ | |
this.WBEvents++; | |
} | |
else | |
{ | |
this.OtherEvents++; | |
} | |
} | |
public override string ToString() | |
{ | |
return string.Format( | |
"events: {0:N0},\t WB: {1:N0} ({2:P1}),\t $: {3:N0} ({4:P1}),\t other: {5},\t total speed: {6:N0}epm,\t WB speed: {7:N0}epm,\t time: {8:%m}m {8:%s}s", | |
this.TotalEvents, this.WBEvents, this.WBEventsPart(), this.SystemEvents, this.SystemEventsPart(), this.OtherEvents, | |
this.TotalEventsPerMinute(), this.WBEventsPerMinute(), this.TotalTime()); | |
} | |
private double WBEventsPart() | |
{ | |
return (double) this.WBEvents / this.TotalEvents; | |
} | |
private double SystemEventsPart() | |
{ | |
return (double) this.SystemEvents / this.TotalEvents; | |
} | |
private int TotalEventsPerMinute() | |
{ | |
return (int) (this.TotalEvents / this.TotalMinutes()); | |
} | |
private int WBEventsPerMinute() | |
{ | |
return (int) (this.WBEvents / this.TotalMinutes()); | |
} | |
private double TotalMinutes() | |
{ | |
return this.TotalTime().TotalMinutes; | |
} | |
private TimeSpan TotalTime() | |
{ | |
return DateTime.Now - this.StartTime; | |
} | |
} | |
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(30); | |
static void Main(string[] args) | |
{ | |
using (IEventStoreConnection connection = OpenConnection()) | |
{ | |
var position = Position.Start; | |
AllEventsSlice slice; | |
var status = new Status(); | |
do | |
{ | |
slice = RunWithDefaultTimeout(connection.ReadAllEventsForwardAsync(position, 1024, resolveLinkTos: false)); | |
position = slice.NextPosition; | |
foreach (var @event in slice.Events) | |
{ | |
status.Update(@event); | |
if (status.TotalEvents % 100000 == 0) | |
{ | |
Console.WriteLine(status.ToString()); | |
} | |
} | |
} while (!slice.IsEndOfStream); | |
Console.WriteLine(status.ToString()); | |
} | |
} | |
private static IEventStoreConnection OpenConnection() | |
{ | |
var settings = ConnectionSettings | |
.Create() | |
.KeepReconnecting() | |
.SetDefaultUserCredentials(new UserCredentials("admin", "changeit")); | |
var serverIp = IPAddress.Parse("127.0.0.1"); | |
var tcpEndPoint = new IPEndPoint(serverIp, 1117); | |
var eventStoreConnection = EventStoreConnection.Create(settings, tcpEndPoint); | |
using (var cancellationTokenSource = new CancellationTokenSource()) | |
{ | |
cancellationTokenSource.CancelAfter(DefaultTimeout); | |
eventStoreConnection.ConnectAsync().WaitAndUnwrapException(cancellationTokenSource.Token); | |
} | |
return eventStoreConnection; | |
} | |
private static TResult RunWithDefaultTimeout<TResult>(Task<TResult> readTask) | |
{ | |
using (var timeoutTokenSource = new CancellationTokenSource()) | |
{ | |
timeoutTokenSource.CancelAfter(DefaultTimeout); | |
Task timeoutTask = timeoutTokenSource.Token.AsTask(); | |
Task firstFinishedTask = AsyncContext.Run(() => Task.WhenAny(timeoutTask, readTask)); | |
if (firstFinishedTask == timeoutTask) | |
throw new TimeoutException(string.Format("Failed to perform Event Store operation using timeout {0}", DefaultTimeout)); | |
return ((Task<TResult>)firstFinishedTask).Result; | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="utf-8"?> | |
<packages> | |
<package id="EventStore.Client" version="3.0.2" targetFramework="net45" /> | |
<package id="Nito.AsyncEx" version="3.0.0" targetFramework="net45" /> | |
</packages> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment