Skip to content

Instantly share code, notes, and snippets.

@kenegozi
Created January 16, 2015 07:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kenegozi/29e6e398caee1a176a7c to your computer and use it in GitHub Desktop.
Save kenegozi/29e6e398caee1a176a7c to your computer and use it in GitHub Desktop.
Mimicking some of the samples from the talk "Go Concurrency Patterns" in c# (slides at https://talks.golang.org/2012/concurrency.slide#1) , to help illustrate the GO Channel concept to C# developers.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace GoPatterns
{
class Program
{
static void Main()
{
Example_1_Generator();
Example_2_Multiple();
Example_3_FanIn();
}
static Channel<string> Boring(string msg)
{
var rand = new Random();
var c = new Channel<string>();
Task.Factory.StartNew(() =>
{
for (var i = 0; ; i++)
{
c.Send(string.Format("{0} {1}", msg, i));
Thread.Sleep(rand.Next(1000));
}
});
return c;
}
static void Example_1_Generator()
{
var c = Boring("boring!");
for (var i = 0; i < 5; i++)
{
Console.WriteLine("You say: {0}", c.Receive());
}
Console.WriteLine("You're boring; I'm leaving.");
}
static void Example_2_Multiple()
{
var joe = Boring("Joe");
var ann = Boring("Ann");
for (var i = 0; i < 5; i++)
{
Console.WriteLine(joe.Receive());
Console.WriteLine(ann.Receive());
}
Console.WriteLine("You're both boring; I'm leaving.");
}
static void Example_3_FanIn()
{
var c = FanIn(Boring("Joe"), Boring("Ann"));
for (var i = 0; i < 10; i++)
{
Console.WriteLine(c.Receive());
}
Console.WriteLine("You're both boring; I'm leaving.");
}
static Channel<string> FanIn(Channel<string> input1, Channel<string> input2)
{
var c = new Channel<string>();
Task.Factory.StartNew(() => { while (true) { c.Send(input1.Receive()); } });
Task.Factory.StartNew(() => { while (true) { c.Send(input2.Receive()); } });
return c;
}
}
class Channel<T>
{
private readonly ConcurrentQueue<Entry> entries = new ConcurrentQueue<Entry>();
/// <summary>
/// Sends value on the channel, and blocks until the value was Received
/// from the channel.
/// </summary>
public void Send(T value)
{
var entry = new Entry { Dequeued = new ManualResetEvent(false), Value = value };
entries.Enqueue(entry);
entry.Dequeued.WaitOne();
}
/// <summary>
/// Receives a value from the channel, blocking until there is something to receive
/// if the channel is currently empty;
/// </summary>
public T Receive()
{
while (true)
{
Entry next;
if (entries.TryDequeue(out next))
{
next.Dequeued.Set();
return next.Value;
}
Thread.Sleep(100);
}
}
private struct Entry
{
public T Value;
public ManualResetEvent Dequeued;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment