Buffer Using Queue Protected By A ReaderWriterLockSlim
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Concurrency; | |
using System.Threading; | |
using System.Diagnostics; | |
namespace RxThreadPoolUtilization | |
{ | |
public class Message | |
{ | |
public int Id { get; private set; } | |
public DateTime Start { get; set; } | |
public DateTime End { get; set; } | |
public int ProcessedOnThreadId { get; set; } | |
public int CreatedOnThreadId { get; set; } | |
public Message(int id_) | |
{ | |
#if DEBUG | |
//Console.WriteLine("[{0}], {1}, Message:ctor()", id_, DateTime.Now.ToString("mm:ss.fff")); | |
#endif | |
Id = id_; | |
Start = DateTime.Now; | |
CreatedOnThreadId = Thread.CurrentThread.ManagedThreadId; | |
} | |
} | |
class Program | |
{ | |
private static IDisposable subscription; | |
static void Main(string[] args) | |
{ | |
int count = int.Parse(args[0]); | |
TestUsingExplicitLocking(count); | |
//subscription = TestMaxSpeed(count); | |
Console.ReadKey(); | |
subscription = null; | |
} | |
static void TestPool(int count_) | |
{ | |
int id = 1; | |
// R = M / S | |
// where R = rate | |
// M = number of messages | |
// S = number of seconds in the time window over which we measured R | |
// create a relatively fast src | |
// publishing event every second | |
var span = TimeSpan.FromMilliseconds(100); | |
var sub = Observable.Timer(span, span).Select(t => new Message(id++)) | |
.ObserveOn(Scheduler.ThreadPool) | |
.Select(m => DoWork(m)) | |
.Do(m => Console.WriteLine("[{0}] Created on {1} and Processed on {2} in {3} millis", | |
m.Id, m.CreatedOnThreadId, m.ProcessedOnThreadId, | |
(m.End - m.Start).TotalMilliseconds)) | |
.Subscribe(); | |
} | |
private static long RemainingCount; | |
static IDisposable TestMaxSpeed(int count_) | |
{ | |
RemainingCount = count_; | |
Message[] messages = AllocateMessages(count_); | |
var obs = Enumerable.Range(1, count_).ToObservable() | |
.Select(i => messages[i-1]) | |
//.ObserveOn(Scheduler.ThreadPool) | |
.Select(m => DoWorkFast(m)); | |
(new Thread(new ParameterizedThreadStart(TimerAgent))).Start(count_); | |
return obs.Subscribe(); | |
} | |
private static Message[] AllocateMessages(int count_) | |
{ | |
int id = 1; | |
var sw = new Stopwatch(); | |
sw.Start(); | |
Message[] messages = new Message[count_]; | |
for (int ii = 0; ii < count_; ++ii) | |
{ | |
messages[ii] = new Message(id++); | |
} | |
sw.Stop(); | |
Console.WriteLine("Took {0} millis to allocate {1} Message objects", sw.ElapsedMilliseconds, count_); | |
return messages; | |
} | |
static void TimerAgent(object count_) | |
{ | |
int count = (int) count_; | |
var sw = new Stopwatch(); | |
sw.Start(); | |
while(true) | |
{ | |
long remaining = Interlocked.Read(ref RemainingCount); | |
#if DEBUG | |
Console.WriteLine("remaining count = {0}", remaining); | |
#endif | |
if(remaining > 1) Thread.Sleep(100); | |
else break; | |
} | |
sw.Stop(); | |
Console.WriteLine("Took {0} millis to process {1} messages", sw.ElapsedMilliseconds, count_); | |
} | |
private static int ProcessedCount; | |
static Message DoWorkFast(Message message_) | |
{ | |
Interlocked.Decrement(ref RemainingCount); | |
int processedCount = 0; | |
if(message_.Id >= 0) | |
{ | |
processedCount = Interlocked.Add(ref ProcessedCount, 1); | |
} | |
#if DEBUG | |
Console.WriteLine("ProcessedCount = {0}", processedCount); | |
#endif | |
return message_; | |
} | |
static MyBufferedQueue BQ = new MyBufferedQueue(); | |
static void TestUsingExplicitLocking(int count_) | |
{ | |
RemainingCount = count_; | |
Message[] messages = AllocateMessages(count_); | |
(new Thread(new ParameterizedThreadStart(PushMessages))).Start(messages); | |
(new Thread(new ThreadStart(GetMessages)){IsBackground = true}).Start(); | |
(new Thread(new ParameterizedThreadStart(TimerAgent))).Start(count_); | |
} | |
static void PushMessages(object messages_) | |
{ | |
var messages = (Message[]) messages_; | |
int count = messages.Length; | |
for(int i = 0; i < count; ++i) | |
BQ.Put(messages[i]); | |
} | |
static void GetMessages() | |
{ | |
while (true) | |
{ | |
var m = BQ.Get(); | |
if (m != null) | |
{ | |
DoWorkFast(m); | |
} | |
else | |
{ | |
// yield the cpu to avoid starving the writer | |
Thread.Sleep(0); | |
} | |
} | |
} | |
static Message DoWork(Message message_) | |
{ | |
Console.WriteLine("[{0}], {1}, DoWork", message_.Id, DateTime.Now.ToString("mm:ss.fff")); | |
Thread.Sleep(TimeSpan.FromMilliseconds(200)); | |
message_.ProcessedOnThreadId = Thread.CurrentThread.ManagedThreadId; | |
message_.End = DateTime.Now; | |
return message_; | |
} | |
} | |
public class MyBufferedQueue | |
{ | |
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); | |
private Queue<Message> _q = new Queue<Message>(); | |
public Message Get() | |
{ | |
_lock.EnterReadLock(); | |
try | |
{ | |
if (_q.Count > 0) | |
return _q.Dequeue(); | |
} | |
finally | |
{ | |
_lock.ExitReadLock(); | |
} | |
return null; | |
} | |
public void Put(Message m_) | |
{ | |
_lock.EnterWriteLock(); | |
try | |
{ | |
_q.Enqueue(m_); | |
} | |
finally | |
{ | |
_lock.ExitWriteLock(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment