Skip to content

Instantly share code, notes, and snippets.

@afish
Last active November 9, 2024 21:45
Show Gist options
  • Save afish/ef96c05bffb9478c4e34327367059a22 to your computer and use it in GitHub Desktop.
Save afish/ef96c05bffb9478c4e34327367059a22 to your computer and use it in GitHub Desktop.
namespace AsyncPriorities
{
public class MessageProcessor
{
public static async Task Simulate()
{
Initialize();
StartMonitoringThread();
await RunLoop();
}
private static void Initialize()
{
CurrentStats = new Stats();
CurrentSettings = new Settings
{
ReceivingDuration = TimeSpan.FromMilliseconds(250),
RefreshDelay = TimeSpan.FromSeconds(2),
RefreshDuration = TimeSpan.FromMilliseconds(250),
MessageLeaseTimeout = TimeSpan.FromSeconds(5),
ProcessingCPUDuration = TimeSpan.FromMilliseconds(1000),
ProcessingIODuration = TimeSpan.FromMilliseconds(1000),
ProcessingLoops = 20,
MessagesCount = 100,
DesiredPriority = -1,
Cookie = Guid.Empty,
};
}
private static async Task RunLoop()
{
while (true)
{
var message = await ReceiveMessage();
if (message == null) continue;
KeepLease(message);
ProcessMessage(message);
}
}
public static async Task<Message?> ReceiveMessage()
{
await Task.Yield();
await Task.Delay(CurrentSettings.ReceivingDuration);
await Prioritize(1);
if (CurrentSettings.MessagesCount-- > 0)
{
CurrentStats.GeneratedMessages++;
Message message = new Message
{
LastRefreshTime = DateTime.Now,
WasLost = false,
Id = CurrentStats.GeneratedMessages,
ReceiveTime = DateTime.Now
};
Log($"New message received with id {message.Id}");
return message;
}
else
{
return null;
}
}
public static async Task ProcessMessage(Message message)
{
await Task.Yield();
await Prioritize(2);
for (int part = 0; part < CurrentSettings.ProcessingLoops && message.WasLost == false; ++part)
{
Thread.Sleep(CurrentSettings.ProcessingCPUDuration); // CPU-bound part
await Task.Delay(CurrentSettings.ProcessingIODuration); // IO-bound part
await Prioritize(2);
}
message.WasFinished = true;
if (!message.WasLost)
{
Log($"Finished message with id {message.Id} in {DateTime.Now - message.ReceiveTime}");
}
}
public static async Task KeepLease(Message message)
{
await Task.Yield();
await Prioritize(3);
while (message.WasFinished == false) // This is unsafe according to memory model
{
await Task.Delay(CurrentSettings.RefreshDelay);
await Prioritize(3);
if (DateTime.Now > message.LastRefreshTime + CurrentSettings.MessageLeaseTimeout)
{
message.WasLost = true;
CurrentStats.Lost++;
Log($"Lost lease for message {message.Id}");
return;
}
else
{
await Task.Delay(CurrentSettings.RefreshDuration);
await Prioritize(3);
Log($"Refreshed lease for message {message.Id}");
message.LastRefreshTime = DateTime.Now;
}
}
CurrentStats.ProcessedSuccessfully++;
}
public static async Task Prioritize(int priority)
{
var cookie = Guid.NewGuid();
while (true)
{
if (CurrentSettings.DesiredPriority == priority && CurrentSettings.Cookie == cookie)
{
CurrentSettings.DesiredPriority = -1;
CurrentSettings.Cookie = Guid.Empty;
return;
}
else
{
if (CurrentSettings.DesiredPriority < priority)
{
CurrentSettings.DesiredPriority = priority;
CurrentSettings.Cookie = cookie;
await Task.Yield();
continue;
}
else
{
await Task.Yield();
continue;
}
}
}
}
private static void StartMonitoringThread()
{
Thread monitoringThread = new Thread(() =>
{
while (true)
{
Thread.Sleep(TimeSpan.FromSeconds(3));
Log($"Received messages {CurrentStats.GeneratedMessages}, " +
$"success {CurrentStats.ProcessedSuccessfully}, " +
$"failed {CurrentStats.Lost}, " +
$"still running {CurrentStats.GeneratedMessages - CurrentStats.ProcessedSuccessfully - CurrentStats.Lost}");
}
});
monitoringThread.IsBackground = true;
monitoringThread.Start();
}
public static void Log(string message)
{
Console.WriteLine($"{DateTime.Now}\t{Thread.CurrentThread.ManagedThreadId}\t{message}");
}
public static Stats CurrentStats;
public static Settings CurrentSettings;
}
public class Message
{
public DateTime LastRefreshTime { get; set; }
public DateTime ReceiveTime { get; set; }
public bool WasLost { get; set; }
public bool WasFinished { get; set; }
public int Id { get; set; }
}
public class Settings
{
public int MessagesCount { get; set; }
public TimeSpan ReceivingDuration { get; set; }
public TimeSpan ProcessingCPUDuration { get; set; }
public TimeSpan ProcessingIODuration { get; set; }
public int ProcessingLoops { get; set; }
public TimeSpan RefreshDelay { get; set; }
public TimeSpan RefreshDuration { get; set; }
public TimeSpan MessageLeaseTimeout { get; set; }
public int DesiredPriority { get; set; }
public Guid Cookie { get; set; }
}
public class Stats
{
public int GeneratedMessages { get; set; }
public int ProcessedSuccessfully { get; set; }
public int Lost { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment