Skip to content

Instantly share code, notes, and snippets.

@denisivan0v
Last active August 23, 2020 09:42
Show Gist options
  • Save denisivan0v/b6f8f000f90f614003317b0b8b925e16 to your computer and use it in GitHub Desktop.
Save denisivan0v/b6f8f000f90f614003317b0b8b925e16 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace InteractiveProgressBar
{
internal static class Program
{
private const int FileCount = 763;
private const int ThreadsUsed = 4;
private static void Main()
{
Console.CursorVisible = false;
var cancellationTokenSource = new CancellationTokenSource();
using (var messagePump = new MessagePump<ExitMessage>())
{
messagePump.RegisterMessageHandler<KeyPressedMessage>(new KeyPressedMessageHandler(messagePump, cancellationTokenSource));
messagePump.RegisterMessageHandler<ReportProgressMessage>(new ReportProgressMessageHandler());
messagePump.RegisterMessageHandler<WorkDoneMessage>(new WorkDoneMessageHandler());
StartListener(messagePump, cancellationTokenSource.Token);
StartAnalyzer(messagePump, cancellationTokenSource.Token);
Console.WriteLine("Find usages analyzer started. Press Q to Exit.");
messagePump.DoPumping();
}
}
private static Task StartListener(MessagePump<ExitMessage> messagePump, CancellationToken cancellationToken)
{
return Task.Run(
() =>
{
while (!cancellationToken.IsCancellationRequested)
{
if (Console.KeyAvailable)
{
var keyInfo = Console.ReadKey(false);
messagePump.SendMessage(new KeyPressedMessage(keyInfo));
}
}
},
cancellationToken);
}
private static Task StartAnalyzer(MessagePump<ExitMessage> messagePump, CancellationToken cancellationToken)
{
return Task.Run(
async () =>
{
var analyzer = new Analyzer(messagePump, ThreadsUsed, FileCount);
await analyzer.FindUsagesAsync(cancellationToken);
},
cancellationToken);
}
private interface IMessage
{
}
private sealed class KeyPressedMessage : IMessage
{
public KeyPressedMessage(ConsoleKeyInfo keyInfo)
{
KeyInfo = keyInfo;
}
public ConsoleKeyInfo KeyInfo { get; }
}
private sealed class ReportProgressMessage : IMessage
{
public ReportProgressMessage(float value)
{
Value = value;
}
public float Value { get; }
}
private sealed class WorkDoneMessage : IMessage
{
public WorkDoneMessage(string message)
{
Message = message;
}
public string Message { get; }
}
private sealed class ExitMessage : IMessage
{
}
private interface IMessageHandler
{
void Handle(IMessage message);
}
private interface IMessageHandler<TMessage> : IMessageHandler where TMessage : IMessage
{
}
private sealed class KeyPressedMessageHandler : IMessageHandler<KeyPressedMessage>
{
private readonly MessagePump<ExitMessage> _messagePump;
private readonly CancellationTokenSource _cancellationTokenSource;
public KeyPressedMessageHandler(MessagePump<ExitMessage> messagePump, CancellationTokenSource cancellationTokenSource)
{
_messagePump = messagePump;
_cancellationTokenSource = cancellationTokenSource;
}
public void Handle(IMessage message)
{
var keyPressedEvent = (KeyPressedMessage) message;
var keyInfo = keyPressedEvent.KeyInfo;
if (keyInfo.Key == ConsoleKey.Q)
{
_cancellationTokenSource.Cancel();
_messagePump.SendMessage(new WorkDoneMessage("Work was interrupted."));
_messagePump.SendMessage(new ExitMessage());
}
}
}
private sealed class ReportProgressMessageHandler : IMessageHandler<ReportProgressMessage>
{
public void Handle(IMessage message)
{
var reportProgressMessage = (ReportProgressMessage)message;
var originalX = Console.CursorLeft;
var originalY = Console.CursorTop;
Console.Write($"{reportProgressMessage.Value:F}%");
Console.SetCursorPosition(originalX, originalY);
}
}
private sealed class WorkDoneMessageHandler : IMessageHandler<WorkDoneMessage>
{
public void Handle(IMessage message)
{
var workDoneMessage = (WorkDoneMessage) message;
Console.WriteLine();
Console.WriteLine(workDoneMessage.Message);
}
}
private sealed class MessagePump<TExit> : IDisposable
{
private readonly ConcurrentDictionary<Type, IMessageHandler> _eventHandlers = new ConcurrentDictionary<Type, IMessageHandler>();
private readonly TimeSpan _waitTimeout = TimeSpan.FromSeconds(10);
private readonly BlockingCollection<IMessage> _messageQueue = new BlockingCollection<IMessage>();
private bool _isStarted;
public void RegisterMessageHandler<TMessage>(IMessageHandler messageHandler) where TMessage : IMessage
{
if (!_eventHandlers.TryAdd(typeof(TMessage), messageHandler))
{
throw new ArgumentException("Provided event handler cannot be added.");
}
}
public void SendMessage(IMessage message)
{
_messageQueue.Add(message);
}
public void DoPumping()
{
if (_isStarted)
{
throw new InvalidOperationException("Event loop has been already started.");
}
_isStarted = true;
while (true)
{
if (!_messageQueue.TryTake(out var message, _waitTimeout))
{
continue;
}
if (message is TExit)
{
return;
}
if (_eventHandlers.TryGetValue(message.GetType(), out var eventHandler))
{
eventHandler.Handle(message);
}
}
}
public void Dispose()
{
_isStarted = false;
_messageQueue.Dispose();
}
}
private sealed class Analyzer
{
private readonly Random _random = new Random();
private readonly MessagePump<ExitMessage> _messagePump;
private readonly float _progressStep;
private readonly IList<IEnumerator<int>> _partitions;
private readonly object _syncRoot = new object();
private float _progress;
public Analyzer(MessagePump<ExitMessage> messagePump, int degreeOfParallelism, int fileCount)
{
_messagePump = messagePump;
_progressStep = 100f / fileCount;
_partitions = Partitioner.Create(Enumerable.Range(0, fileCount)).GetPartitions(degreeOfParallelism);
}
public async Task FindUsagesAsync(CancellationToken cancellationToken)
{
var tasks = _partitions.Select(
async partition =>
{
while (partition.MoveNext())
{
await FindUsagesInFileAsync(cancellationToken);
}
});
await Task.WhenAll(tasks);
_messagePump.SendMessage(new WorkDoneMessage("Find usages done."));
_messagePump.SendMessage(new ExitMessage());
}
private async Task FindUsagesInFileAsync(CancellationToken cancellationToken)
{
var usagesInFile = _random.Next(1, 10);
for (var usagesIndex = 0; usagesIndex < usagesInFile && !cancellationToken.IsCancellationRequested; usagesIndex++)
{
await Task.Delay(_random.Next(100, 1000) / usagesInFile, cancellationToken);
lock (_syncRoot)
{
_progress += _progressStep / usagesInFile;
_messagePump.SendMessage(new ReportProgressMessage(_progress));
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment