Skip to content

Instantly share code, notes, and snippets.

/MyBuffer.cs Secret

Created Jan 13, 2011
Embed
What would you like to do?
Buffer Using Queue Protected By A ReaderWriterLockSlim
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Concurrency;
using System.Threading;
using System.Diagnostics;
namespace RxThreadPoolUtilization
{
public class Message
{
public int Id { get; private set; }
public DateTime Start { get; set; }
public DateTime End { get; set; }
public int ProcessedOnThreadId { get; set; }
public int CreatedOnThreadId { get; set; }
public Message(int id_)
{
#if DEBUG
//Console.WriteLine("[{0}], {1}, Message:ctor()", id_, DateTime.Now.ToString("mm:ss.fff"));
#endif
Id = id_;
Start = DateTime.Now;
CreatedOnThreadId = Thread.CurrentThread.ManagedThreadId;
}
}
class Program
{
private static IDisposable subscription;
static void Main(string[] args)
{
int count = int.Parse(args[0]);
TestUsingExplicitLocking(count);
//subscription = TestMaxSpeed(count);
Console.ReadKey();
subscription = null;
}
static void TestPool(int count_)
{
int id = 1;
// R = M / S
// where R = rate
// M = number of messages
// S = number of seconds in the time window over which we measured R
// create a relatively fast src
// publishing event every second
var span = TimeSpan.FromMilliseconds(100);
var sub = Observable.Timer(span, span).Select(t => new Message(id++))
.ObserveOn(Scheduler.ThreadPool)
.Select(m => DoWork(m))
.Do(m => Console.WriteLine("[{0}] Created on {1} and Processed on {2} in {3} millis",
m.Id, m.CreatedOnThreadId, m.ProcessedOnThreadId,
(m.End - m.Start).TotalMilliseconds))
.Subscribe();
}
private static long RemainingCount;
static IDisposable TestMaxSpeed(int count_)
{
RemainingCount = count_;
Message[] messages = AllocateMessages(count_);
var obs = Enumerable.Range(1, count_).ToObservable()
.Select(i => messages[i-1])
//.ObserveOn(Scheduler.ThreadPool)
.Select(m => DoWorkFast(m));
(new Thread(new ParameterizedThreadStart(TimerAgent))).Start(count_);
return obs.Subscribe();
}
private static Message[] AllocateMessages(int count_)
{
int id = 1;
var sw = new Stopwatch();
sw.Start();
Message[] messages = new Message[count_];
for (int ii = 0; ii < count_; ++ii)
{
messages[ii] = new Message(id++);
}
sw.Stop();
Console.WriteLine("Took {0} millis to allocate {1} Message objects", sw.ElapsedMilliseconds, count_);
return messages;
}
static void TimerAgent(object count_)
{
int count = (int) count_;
var sw = new Stopwatch();
sw.Start();
while(true)
{
long remaining = Interlocked.Read(ref RemainingCount);
#if DEBUG
Console.WriteLine("remaining count = {0}", remaining);
#endif
if(remaining > 1) Thread.Sleep(100);
else break;
}
sw.Stop();
Console.WriteLine("Took {0} millis to process {1} messages", sw.ElapsedMilliseconds, count_);
}
private static int ProcessedCount;
static Message DoWorkFast(Message message_)
{
Interlocked.Decrement(ref RemainingCount);
int processedCount = 0;
if(message_.Id >= 0)
{
processedCount = Interlocked.Add(ref ProcessedCount, 1);
}
#if DEBUG
Console.WriteLine("ProcessedCount = {0}", processedCount);
#endif
return message_;
}
static MyBufferedQueue BQ = new MyBufferedQueue();
static void TestUsingExplicitLocking(int count_)
{
RemainingCount = count_;
Message[] messages = AllocateMessages(count_);
(new Thread(new ParameterizedThreadStart(PushMessages))).Start(messages);
(new Thread(new ThreadStart(GetMessages)){IsBackground = true}).Start();
(new Thread(new ParameterizedThreadStart(TimerAgent))).Start(count_);
}
static void PushMessages(object messages_)
{
var messages = (Message[]) messages_;
int count = messages.Length;
for(int i = 0; i < count; ++i)
BQ.Put(messages[i]);
}
static void GetMessages()
{
while (true)
{
var m = BQ.Get();
if (m != null)
{
DoWorkFast(m);
}
else
{
// yield the cpu to avoid starving the writer
Thread.Sleep(0);
}
}
}
static Message DoWork(Message message_)
{
Console.WriteLine("[{0}], {1}, DoWork", message_.Id, DateTime.Now.ToString("mm:ss.fff"));
Thread.Sleep(TimeSpan.FromMilliseconds(200));
message_.ProcessedOnThreadId = Thread.CurrentThread.ManagedThreadId;
message_.End = DateTime.Now;
return message_;
}
}
public class MyBufferedQueue
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private Queue<Message> _q = new Queue<Message>();
public Message Get()
{
_lock.EnterReadLock();
try
{
if (_q.Count > 0)
return _q.Dequeue();
}
finally
{
_lock.ExitReadLock();
}
return null;
}
public void Put(Message m_)
{
_lock.EnterWriteLock();
try
{
_q.Enqueue(m_);
}
finally
{
_lock.ExitWriteLock();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment