-
-
Save afish/5fc1baed950ed0b9531ad7483755144b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AsyncPriorities | |
{ | |
public class MessageProcessor | |
{ | |
public static async Task Simulate() | |
{ | |
Initialize(); | |
StartMonitoringThread(); | |
await RunLoop(); | |
} | |
private static void Initialize() | |
{ | |
CurrentStats = new Stats(); | |
CurrentSettings = new Settings | |
{ | |
ReceivingDuration = TimeSpan.FromMilliseconds(250), | |
RefreshDelay = TimeSpan.FromSeconds(2), | |
RefreshDuration = TimeSpan.FromMilliseconds(250), | |
MessageLeaseTimeout = TimeSpan.FromSeconds(5), | |
ProcessingCPUDuration = TimeSpan.FromMilliseconds(100), | |
ProcessingIODuration = TimeSpan.FromMilliseconds(1000), | |
ProcessingLoops = 20, | |
MessagesCount = 100 | |
}; | |
} | |
private static async Task RunLoop() | |
{ | |
while (true) | |
{ | |
var message = await ReceiveMessage(); | |
if (message == null) continue; | |
KeepLease(message); | |
ProcessMessage(message); | |
} | |
} | |
public static async Task<Message?> ReceiveMessage() | |
{ | |
await Task.Yield(); | |
await Task.Delay(CurrentSettings.ReceivingDuration); | |
if (CurrentSettings.MessagesCount-- > 0) | |
{ | |
CurrentStats.GeneratedMessages++; | |
Message message = new Message | |
{ | |
LastRefreshTime = DateTime.Now, | |
WasLost = false, | |
Id = CurrentStats.GeneratedMessages, | |
ReceiveTime = DateTime.Now | |
}; | |
Log($"New message received with id {message.Id}"); | |
return message; | |
} | |
else | |
{ | |
return null; | |
} | |
} | |
public static async Task ProcessMessage(Message message) | |
{ | |
await Task.Yield(); | |
for (int part = 0; part < CurrentSettings.ProcessingLoops && message.WasLost == false; ++part) | |
{ | |
Thread.Sleep(CurrentSettings.ProcessingCPUDuration); // CPU-bound part | |
await Task.Delay(CurrentSettings.ProcessingIODuration); // IO-bound part | |
} | |
message.WasFinished = true; | |
if (!message.WasLost) | |
{ | |
Log($"Finished message with id {message.Id} in {DateTime.Now - message.ReceiveTime}"); | |
} | |
} | |
public static async Task KeepLease(Message message) | |
{ | |
await Task.Yield(); | |
while (message.WasFinished == false) // This is unsafe according to memory model | |
{ | |
await Task.Delay(CurrentSettings.RefreshDelay); | |
if (DateTime.Now > message.LastRefreshTime + CurrentSettings.MessageLeaseTimeout) | |
{ | |
message.WasLost = true; | |
CurrentStats.Lost++; | |
Log($"Lost lease for message {message.Id}"); | |
return; | |
} | |
else | |
{ | |
await Task.Delay(CurrentSettings.RefreshDuration); | |
Log($"Refreshed lease for message {message.Id}"); | |
message.LastRefreshTime = DateTime.Now; | |
} | |
} | |
CurrentStats.ProcessedSuccessfully++; | |
} | |
private static void StartMonitoringThread() | |
{ | |
Thread monitoringThread = new Thread(() => | |
{ | |
while (true) | |
{ | |
Thread.Sleep(TimeSpan.FromSeconds(3)); | |
Log($"Received messages {CurrentStats.GeneratedMessages}, " + | |
$"success {CurrentStats.ProcessedSuccessfully}, " + | |
$"failed {CurrentStats.Lost}, " + | |
$"still running {CurrentStats.GeneratedMessages - CurrentStats.ProcessedSuccessfully - CurrentStats.Lost}"); | |
} | |
}); | |
monitoringThread.IsBackground = true; | |
monitoringThread.Start(); | |
} | |
public static void Log(string message) | |
{ | |
Console.WriteLine($"{DateTime.Now}\t{Thread.CurrentThread.ManagedThreadId}\t{message}"); | |
} | |
public static Stats CurrentStats; | |
public static Settings CurrentSettings; | |
} | |
public class Message | |
{ | |
public DateTime LastRefreshTime { get; set; } | |
public DateTime ReceiveTime { get; set; } | |
public bool WasLost { get; set; } | |
public bool WasFinished { get; set; } | |
public int Id { get; set; } | |
} | |
public class Settings | |
{ | |
public int MessagesCount { get; set; } | |
public TimeSpan ReceivingDuration { get; set; } | |
public TimeSpan ProcessingCPUDuration { get; set; } | |
public TimeSpan ProcessingIODuration { get; set; } | |
public int ProcessingLoops { get; set; } | |
public TimeSpan RefreshDelay { get; set; } | |
public TimeSpan RefreshDuration { get; set; } | |
public TimeSpan MessageLeaseTimeout { get; set; } | |
public int DesiredPriority { get; set; } | |
} | |
public class Stats | |
{ | |
public int GeneratedMessages { get; set; } | |
public int ProcessedSuccessfully { get; set; } | |
public int Lost { get; set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment