Skip to content

Instantly share code, notes, and snippets.

@svick
Created July 25, 2018 16:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save svick/ba4e67068b12323784dd7db2abd913d9 to your computer and use it in GitHub Desktop.
Save svick/ba4e67068b12323784dd7db2abd913d9 to your computer and use it in GitHub Desktop.
class DelayTaskSource
{
readonly ValueTaskSource valueTaskSource;
readonly object l = new object();
readonly List<(Action<object>, object, short)> continuations = new List<(Action<object>, object, short)>();
readonly Dictionary<short, long> timestamps = new Dictionary<short, long>();
readonly List<short> tokenCache = new List<short>();
readonly Timer timer;
long? timerTimestamp;
short nextToken;
public DelayTaskSource()
{
valueTaskSource = new ValueTaskSource(this);
timer = new Timer(TimerCallback);
}
private static long ToTicks(TimeSpan timeSpan) => (long)(timeSpan.TotalSeconds * Stopwatch.Frequency);
private static TimeSpan ToTimeSpan(long ticks) => TimeSpan.FromSeconds((double)ticks / Stopwatch.Frequency);
public ValueTask Delay(TimeSpan delay)
{
long nowTimestamp = Stopwatch.GetTimestamp();
long delayTimestamp = nowTimestamp + ToTicks(delay);
lock (l)
{
if (nextToken < 0)
throw new Exception("token overflow");
short token = nextToken++;
timestamps.Add(token, delayTimestamp);
if (timerTimestamp == null || timerTimestamp > delayTimestamp)
{
timer.Change(delay, Timeout.InfiniteTimeSpan);
timerTimestamp = delayTimestamp;
}
return new ValueTask(valueTaskSource, token);
}
}
private void TimerCallback(object _)
{
long nowTimestamp = Stopwatch.GetTimestamp();
Action<object> continuation = null;
object state = null;
long? nextTimestamp = null;
lock (l)
{
tokenCache.Clear();
foreach (var (token, timestamp) in timestamps)
{
if (timestamp <= nowTimestamp)
{
tokenCache.Add(token);
}
else if (nextTimestamp == null || timestamp < nextTimestamp)
{
nextTimestamp = timestamp;
}
}
if (nextTimestamp != null)
timer.Change(ToTimeSpan(nextTimestamp.Value - nowTimestamp), Timeout.InfiniteTimeSpan);
foreach (var token in tokenCache)
{
timestamps.Remove(token);
}
foreach (var (c, s, token) in continuations)
{
if (!timestamps.ContainsKey(token))
{
continuation = c;
state = s;
}
}
}
if (continuation != null)
continuation(state);
}
class ValueTaskSource : IValueTaskSource
{
DelayTaskSource parent;
public ValueTaskSource(DelayTaskSource parent) => this.parent = parent;
public void GetResult(short token)
{
lock (parent.l)
{
if (parent.timestamps.ContainsKey(token))
throw new InvalidOperationException("not yet completed");
return; // already completed
}
}
public ValueTaskSourceStatus GetStatus(short token)
{
lock (parent.l)
{
if (parent.timestamps.ContainsKey(token))
return ValueTaskSourceStatus.Pending;
return ValueTaskSourceStatus.Succeeded;
}
}
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
lock (parent.l)
{
if (parent.timestamps.ContainsKey(token))
{
parent.continuations.Add((continuation, state, token));
return;
}
}
// already completed, invoke inline
continuation(state);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment