Created
April 12, 2018 19:25
-
-
Save BrennanConroy/81801a604bd5686ad69d9b7922dade17 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var nagle = new NagleTimer(); | |
while (true) | |
{ | |
var result = await reader.ReadAsync(); | |
var buffer = result.Buffer; | |
try | |
{ | |
if (!buffer.IsEmpty) | |
{ | |
if (buffer.Length >= maxBuffer) | |
{ | |
// "write" | |
//Console.WriteLine($"Wrote {buffer.Length} byte(s)"); | |
bytesWritten += buffer.Length; | |
totalSends++; | |
} | |
else | |
{ | |
while (buffer.Length < maxBuffer) | |
{ | |
innerLoop++; | |
var length = buffer.Length; | |
reader.AdvanceTo(buffer.Start, buffer.End); | |
watch.Start(); | |
//await Task.Delay(1); | |
await nagle; | |
//await Task.Yield(); | |
watch.Stop(); | |
watchTime += watch.ElapsedMilliseconds; | |
watch.Reset(); | |
var hasData = reader.TryRead(out result); | |
buffer = result.Buffer; | |
if (buffer.Length == length) | |
{ | |
break; | |
} | |
} | |
// "write" | |
//Console.WriteLine($"Wrote {buffer.Length} byte(s)"); | |
bytesWritten += buffer.Length; | |
totalSends++; | |
} | |
} | |
else if (result.IsCompleted) | |
{ | |
break; | |
} | |
} | |
finally | |
{ | |
reader.AdvanceTo(buffer.End); | |
} | |
} | |
class NagleTimer : ICriticalNotifyCompletion | |
{ | |
private DelayScheduler _scheduler = new DelayScheduler(); | |
public NagleTimer GetAwaiter() => this; | |
public bool IsCompleted => false; | |
public void GetResult() | |
{ | |
} | |
public void OnCompleted(Action continuation) | |
{ | |
_scheduler.Schedule(o => ((Action)o).Invoke(), continuation); | |
} | |
public void UnsafeOnCompleted(Action continuation) | |
{ | |
OnCompleted(continuation); | |
} | |
} | |
public class DelayScheduler : SchedulerBase | |
{ | |
public int DelayCount = 6; | |
public override void Schedule(Action<object> action, object state) | |
{ | |
SchedulerEventSource.Log.Schedule(this.GetHashCode(), "Delay", _workItems.Count); | |
base.Schedule(action, state); | |
var currentTick = Environment.TickCount; | |
while(_workItems.TryPeek(out var res)) | |
{ | |
if (res.QueueTick + 5 < currentTick) | |
{ | |
if (_workItems.TryDequeue(out var work)) | |
{ | |
SchedulerEventSource.Log.CallbackStart(this.GetHashCode()); | |
work.Callback(work.State); | |
SchedulerEventSource.Log.CallbackStop(this.GetHashCode()); | |
} | |
} | |
else | |
break; | |
} | |
} | |
} | |
/// <summary> | |
/// SchedulerBase implements a very simple PipeScheduler, that basically | |
/// defers anything it schedules 20 msec or so into to future. It however | |
/// is not meant to be used itself. Instead the intent is to subclass it | |
/// and override schedule() so that after scheduling an item it might do | |
/// some of the work previously scheduled. | |
/// | |
/// In general the expectation is that in the 'fast' steady-state path you | |
/// rarely rely on 20 msec timer to go off. | |
/// </summary> | |
public class SchedulerBase : PipeScheduler | |
{ | |
public SchedulerBase() | |
{ | |
_timer = new Timer(Timeout, new WeakReference(this), System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite); | |
_lastTimerUpdate = Environment.TickCount - 1000; // Set the time in the past. THis insures our optimziation does not kick in. | |
} | |
~SchedulerBase() | |
{ | |
Flush(); | |
} | |
public override void Schedule(Action<object> action, object state) | |
{ | |
_workItems.Enqueue(new Work { Callback = action, State = state, QueueTick = Environment.TickCount }); | |
InsureFlush(); | |
} | |
private static void Timeout(object obj) | |
{ | |
WeakReference reference = (WeakReference)obj; | |
SchedulerBase scheduler = reference.Target as SchedulerBase; | |
if (scheduler == null) | |
return; // The Scheduler has died, and the timer has not been disposed yet, Just give up, the timer will die eventually. | |
scheduler.Flush(); | |
} | |
private void Flush() | |
{ | |
SchedulerEventSource.Log.FlushStart(this.GetHashCode()); | |
while (!_workItems.IsEmpty) | |
{ | |
Work itemToRun; | |
var currentTick = Environment.TickCount; | |
if (_workItems.TryDequeue(out itemToRun)) | |
System.Threading.ThreadPool.QueueUserWorkItem((object state) => itemToRun.Callback(state), itemToRun.State); | |
} | |
SchedulerEventSource.Log.FlushStop(this.GetHashCode()); | |
} | |
private void InsureFlush() | |
{ | |
// Logically we set the timeout timer every time we schedule, but we optimize | |
// by skipping if we have done it in the last 10 msec. | |
int curTickCount = Environment.TickCount; | |
if ((uint)(curTickCount - _lastTimerUpdate) > 10) | |
{ | |
_lastTimerUpdate = curTickCount; | |
lock (_timer) | |
_timer.Change(15, System.Threading.Timeout.Infinite); | |
} | |
} | |
internal readonly ConcurrentQueue<Work> _workItems = new ConcurrentQueue<Work>(); | |
private readonly Timer _timer; | |
private int _lastTimerUpdate; | |
internal struct Work | |
{ | |
public Action<object> Callback; | |
public object State; | |
public long QueueTick; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment