-
-
Save afish/ef96c05bffb9478c4e34327367059a22 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AsyncPriorities | |
{ | |
public class MessageProcessor | |
{ | |
public static async Task Simulate() | |
{ | |
Initialize(); | |
StartMonitoringThread(); | |
await RunLoop(); | |
} | |
private static void Initialize() | |
{ | |
CurrentStats = new Stats(); | |
CurrentSettings = new Settings | |
{ | |
ReceivingDuration = TimeSpan.FromMilliseconds(250), | |
RefreshDelay = TimeSpan.FromSeconds(2), | |
RefreshDuration = TimeSpan.FromMilliseconds(250), | |
MessageLeaseTimeout = TimeSpan.FromSeconds(5), | |
ProcessingCPUDuration = TimeSpan.FromMilliseconds(1000), | |
ProcessingIODuration = TimeSpan.FromMilliseconds(1000), | |
ProcessingLoops = 20, | |
MessagesCount = 100, | |
DesiredPriority = -1, | |
Cookie = Guid.Empty, | |
}; | |
} | |
private static async Task RunLoop() | |
{ | |
while (true) | |
{ | |
var message = await ReceiveMessage(); | |
if (message == null) continue; | |
KeepLease(message); | |
ProcessMessage(message); | |
} | |
} | |
public static async Task<Message?> ReceiveMessage() | |
{ | |
await Task.Yield(); | |
await Task.Delay(CurrentSettings.ReceivingDuration); | |
await Prioritize(1); | |
if (CurrentSettings.MessagesCount-- > 0) | |
{ | |
CurrentStats.GeneratedMessages++; | |
Message message = new Message | |
{ | |
LastRefreshTime = DateTime.Now, | |
WasLost = false, | |
Id = CurrentStats.GeneratedMessages, | |
ReceiveTime = DateTime.Now | |
}; | |
Log($"New message received with id {message.Id}"); | |
return message; | |
} | |
else | |
{ | |
return null; | |
} | |
} | |
public static async Task ProcessMessage(Message message) | |
{ | |
await Task.Yield(); | |
await Prioritize(2); | |
for (int part = 0; part < CurrentSettings.ProcessingLoops && message.WasLost == false; ++part) | |
{ | |
Thread.Sleep(CurrentSettings.ProcessingCPUDuration); // CPU-bound part | |
await Task.Delay(CurrentSettings.ProcessingIODuration); // IO-bound part | |
await Prioritize(2); | |
} | |
message.WasFinished = true; | |
if (!message.WasLost) | |
{ | |
Log($"Finished message with id {message.Id} in {DateTime.Now - message.ReceiveTime}"); | |
} | |
} | |
public static async Task KeepLease(Message message) | |
{ | |
await Task.Yield(); | |
await Prioritize(3); | |
while (message.WasFinished == false) // This is unsafe according to memory model | |
{ | |
await Task.Delay(CurrentSettings.RefreshDelay); | |
await Prioritize(3); | |
if (DateTime.Now > message.LastRefreshTime + CurrentSettings.MessageLeaseTimeout) | |
{ | |
message.WasLost = true; | |
CurrentStats.Lost++; | |
Log($"Lost lease for message {message.Id}"); | |
return; | |
} | |
else | |
{ | |
await Task.Delay(CurrentSettings.RefreshDuration); | |
await Prioritize(3); | |
Log($"Refreshed lease for message {message.Id}"); | |
message.LastRefreshTime = DateTime.Now; | |
} | |
} | |
CurrentStats.ProcessedSuccessfully++; | |
} | |
public static async Task Prioritize(int priority) | |
{ | |
var cookie = Guid.NewGuid(); | |
while (true) | |
{ | |
if (CurrentSettings.DesiredPriority == priority && CurrentSettings.Cookie == cookie) | |
{ | |
CurrentSettings.DesiredPriority = -1; | |
CurrentSettings.Cookie = Guid.Empty; | |
return; | |
} | |
else | |
{ | |
if (CurrentSettings.DesiredPriority < priority) | |
{ | |
CurrentSettings.DesiredPriority = priority; | |
CurrentSettings.Cookie = cookie; | |
await Task.Yield(); | |
continue; | |
} | |
else | |
{ | |
await Task.Yield(); | |
continue; | |
} | |
} | |
} | |
} | |
private static void StartMonitoringThread() | |
{ | |
Thread monitoringThread = new Thread(() => | |
{ | |
while (true) | |
{ | |
Thread.Sleep(TimeSpan.FromSeconds(3)); | |
Log($"Received messages {CurrentStats.GeneratedMessages}, " + | |
$"success {CurrentStats.ProcessedSuccessfully}, " + | |
$"failed {CurrentStats.Lost}, " + | |
$"still running {CurrentStats.GeneratedMessages - CurrentStats.ProcessedSuccessfully - CurrentStats.Lost}"); | |
} | |
}); | |
monitoringThread.IsBackground = true; | |
monitoringThread.Start(); | |
} | |
public static void Log(string message) | |
{ | |
Console.WriteLine($"{DateTime.Now}\t{Thread.CurrentThread.ManagedThreadId}\t{message}"); | |
} | |
public static Stats CurrentStats; | |
public static Settings CurrentSettings; | |
} | |
public class Message | |
{ | |
public DateTime LastRefreshTime { get; set; } | |
public DateTime ReceiveTime { get; set; } | |
public bool WasLost { get; set; } | |
public bool WasFinished { get; set; } | |
public int Id { get; set; } | |
} | |
public class Settings | |
{ | |
public int MessagesCount { get; set; } | |
public TimeSpan ReceivingDuration { get; set; } | |
public TimeSpan ProcessingCPUDuration { get; set; } | |
public TimeSpan ProcessingIODuration { get; set; } | |
public int ProcessingLoops { get; set; } | |
public TimeSpan RefreshDelay { get; set; } | |
public TimeSpan RefreshDuration { get; set; } | |
public TimeSpan MessageLeaseTimeout { get; set; } | |
public int DesiredPriority { get; set; } | |
public Guid Cookie { get; set; } | |
} | |
public class Stats | |
{ | |
public int GeneratedMessages { get; set; } | |
public int ProcessedSuccessfully { get; set; } | |
public int Lost { get; set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment