Skip to content

Instantly share code, notes, and snippets.

@phizaz
Created June 21, 2017 10:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phizaz/f67f1ba7d25d30d8972c13b7b6a29319 to your computer and use it in GitHub Desktop.
Save phizaz/f67f1ba7d25d30d8972c13b7b6a29319 to your computer and use it in GitHub Desktop.
C# Producer-Consumer Pattern with Timeout and Return Value
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ProducerConsumer
{
class Job
{
public int a;
public int b;
}
class ReturnValue
{
public int c;
}
class JobWrapper
{
public Job job;
public ManualResetEvent finished;
public ReturnValue res;
}
class Consumer
{
ConcurrentQueue<JobWrapper> queue;
BlockingCollection<JobWrapper> blockingQueue;
Thread thread;
~Consumer()
{
this.abort();
}
public Consumer()
{
this.queue = new ConcurrentQueue<JobWrapper>();
this.blockingQueue = new BlockingCollection<JobWrapper>(queue);
this.thread = new Thread(this.work);
this.thread.Start();
}
public void abort()
{
if (this.thread != null)
{
this.thread.Abort();
this.thread.Join();
this.thread = null;
}
}
private void work()
{
var timeout = 1000;
while (true)
{
JobWrapper wrap;
var succeed = this.blockingQueue.TryTake(out wrap, timeout);
if (!succeed)
{
// timeout
Console.WriteLine("timeout");
}
else
{
// okay
var job = wrap.job;
Console.WriteLine("job: a: " + job.a + " b: " + job.b);
// simulate the return value (this case plus function)
wrap.res = new ReturnValue() { c = job.a + job.b };
// set as finished
wrap.finished.Set();
}
}
}
public ReturnValue runJob(Job job)
{
// wrap job
var wrap = new JobWrapper() {
job = job,
finished = new ManualResetEvent(false),
res = null
};
// add to queue
this.blockingQueue.Add(wrap);
// wait until job finished
wrap.finished.WaitOne();
// return the value
return wrap.res;
}
public void addJob(Job job)
{
// wrap job
var wrap = new JobWrapper()
{
job = job,
finished = new ManualResetEvent(false),
res = null
};
// add to queue
this.blockingQueue.Add(wrap);
}
}
class Program
{
static void Main(string[] args)
{
var consumer = new Consumer();
var res = consumer.runJob(new Job() { a = 10, b = 20 });
Console.WriteLine("res: " + res.c);
consumer.addJob(new Job() { a = 11, b = 21 });
Thread.Sleep(3000);
res = consumer.runJob(new Job() { a = 12, b = 22 });
Console.WriteLine("res: " + res.c);
Thread.Sleep(3000);
consumer.abort();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment