Skip to content

Instantly share code, notes, and snippets.

@BrennanConroy
Created April 12, 2018 19:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BrennanConroy/81801a604bd5686ad69d9b7922dade17 to your computer and use it in GitHub Desktop.
Save BrennanConroy/81801a604bd5686ad69d9b7922dade17 to your computer and use it in GitHub Desktop.
var nagle = new NagleTimer();
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
try
{
if (!buffer.IsEmpty)
{
if (buffer.Length >= maxBuffer)
{
// "write"
//Console.WriteLine($"Wrote {buffer.Length} byte(s)");
bytesWritten += buffer.Length;
totalSends++;
}
else
{
while (buffer.Length < maxBuffer)
{
innerLoop++;
var length = buffer.Length;
reader.AdvanceTo(buffer.Start, buffer.End);
watch.Start();
//await Task.Delay(1);
await nagle;
//await Task.Yield();
watch.Stop();
watchTime += watch.ElapsedMilliseconds;
watch.Reset();
var hasData = reader.TryRead(out result);
buffer = result.Buffer;
if (buffer.Length == length)
{
break;
}
}
// "write"
//Console.WriteLine($"Wrote {buffer.Length} byte(s)");
bytesWritten += buffer.Length;
totalSends++;
}
}
else if (result.IsCompleted)
{
break;
}
}
finally
{
reader.AdvanceTo(buffer.End);
}
}
class NagleTimer : ICriticalNotifyCompletion
{
private DelayScheduler _scheduler = new DelayScheduler();
public NagleTimer GetAwaiter() => this;
public bool IsCompleted => false;
public void GetResult()
{
}
public void OnCompleted(Action continuation)
{
_scheduler.Schedule(o => ((Action)o).Invoke(), continuation);
}
public void UnsafeOnCompleted(Action continuation)
{
OnCompleted(continuation);
}
}
public class DelayScheduler : SchedulerBase
{
public int DelayCount = 6;
public override void Schedule(Action<object> action, object state)
{
SchedulerEventSource.Log.Schedule(this.GetHashCode(), "Delay", _workItems.Count);
base.Schedule(action, state);
var currentTick = Environment.TickCount;
while(_workItems.TryPeek(out var res))
{
if (res.QueueTick + 5 < currentTick)
{
if (_workItems.TryDequeue(out var work))
{
SchedulerEventSource.Log.CallbackStart(this.GetHashCode());
work.Callback(work.State);
SchedulerEventSource.Log.CallbackStop(this.GetHashCode());
}
}
else
break;
}
}
}
/// <summary>
/// SchedulerBase implements a very simple PipeScheduler, that basically
/// defers anything it schedules 20 msec or so into to future. It however
/// is not meant to be used itself. Instead the intent is to subclass it
/// and override schedule() so that after scheduling an item it might do
/// some of the work previously scheduled.
///
/// In general the expectation is that in the 'fast' steady-state path you
/// rarely rely on 20 msec timer to go off.
/// </summary>
public class SchedulerBase : PipeScheduler
{
public SchedulerBase()
{
_timer = new Timer(Timeout, new WeakReference(this), System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
_lastTimerUpdate = Environment.TickCount - 1000; // Set the time in the past. THis insures our optimziation does not kick in.
}
~SchedulerBase()
{
Flush();
}
public override void Schedule(Action<object> action, object state)
{
_workItems.Enqueue(new Work { Callback = action, State = state, QueueTick = Environment.TickCount });
InsureFlush();
}
private static void Timeout(object obj)
{
WeakReference reference = (WeakReference)obj;
SchedulerBase scheduler = reference.Target as SchedulerBase;
if (scheduler == null)
return; // The Scheduler has died, and the timer has not been disposed yet, Just give up, the timer will die eventually.
scheduler.Flush();
}
private void Flush()
{
SchedulerEventSource.Log.FlushStart(this.GetHashCode());
while (!_workItems.IsEmpty)
{
Work itemToRun;
var currentTick = Environment.TickCount;
if (_workItems.TryDequeue(out itemToRun))
System.Threading.ThreadPool.QueueUserWorkItem((object state) => itemToRun.Callback(state), itemToRun.State);
}
SchedulerEventSource.Log.FlushStop(this.GetHashCode());
}
private void InsureFlush()
{
// Logically we set the timeout timer every time we schedule, but we optimize
// by skipping if we have done it in the last 10 msec.
int curTickCount = Environment.TickCount;
if ((uint)(curTickCount - _lastTimerUpdate) > 10)
{
_lastTimerUpdate = curTickCount;
lock (_timer)
_timer.Change(15, System.Threading.Timeout.Infinite);
}
}
internal readonly ConcurrentQueue<Work> _workItems = new ConcurrentQueue<Work>();
private readonly Timer _timer;
private int _lastTimerUpdate;
internal struct Work
{
public Action<object> Callback;
public object State;
public long QueueTick;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment