Skip to content

Instantly share code, notes, and snippets.

@jltrem
Created October 24, 2019 13:40
Show Gist options
  • Save jltrem/7cee8f5a2b95d8cecf658f1810c78756 to your computer and use it in GitHub Desktop.
Save jltrem/7cee8f5a2b95d8cecf658f1810c78756 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Linq;
using LanguageExt;
namespace foo
public class QueuedDemux<T> : IDisposable
{
private readonly BlockingCollection<T> _queue;
private readonly Func<T, Unit> Demux;
private readonly Thread _worker;
public QueuedDemux(IEnumerable<(Func<T, bool>, Action<T>)> outputs)
{
_queue = new BlockingCollection<T>(new ConcurrentQueue<T>());
var @switch = new Lst<(Func<T, bool> predicate, Action<T> action)>(outputs);
Demux = item =>
@switch
.Where(x => x.predicate(item))
.Iter(x => x.action(item));
_worker = new Thread(AsyncWorker) { IsBackground = true };
_worker.Start();
}
public Unit Enqueue(T item)
{
_queue.Add(item);
return Unit.Default;
}
public void Dispose()
{
_queue.CompleteAdding();
}
private void AsyncWorker()
{
while (!_queue.IsCompleted)
{
try
{
_queue
.Take()
.Apply(Demux);
}
catch (InvalidOperationException)
{
break;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment