Created
June 11, 2010 17:57
-
-
Save JulianBirch/434824 to your computer and use it in GitHub Desktop.
Shard Balancer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace ColourCoding.Parallel | |
{ | |
public interface IConsumer : IDisposable | |
{ | |
bool Wakeup(); | |
} | |
public interface IConsumer<TShard> : IConsumer | |
where TShard : class | |
{ | |
TShard ActiveShard { get; } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using Retlang.Channels; | |
using Retlang.Fibers; | |
namespace ColourCoding.Parallel { | |
public delegate IEnumerable<Action> DataToActions<T, TShard>(TShard shard, IEnumerable<T> items); | |
public class ShardingChannel<T, TShard> : IPublisher<T> | |
where TShard : class { | |
private readonly Func<T, TShard> _shardRule; | |
private readonly Action<T, Exception> _shardingFailure; | |
private readonly Dictionary<TShard, List<T>> allQueuedShards = new Dictionary<TShard, List<T>>(); | |
private readonly Queue<Tuple<TShard, List<T>>> inactiveShards = new Queue<Tuple<TShard, List<T>>>(); | |
HashSet<TShard> activeShards = new HashSet<TShard>(); | |
private readonly List<T> pendingQueue = new List<T>(); | |
private readonly List<IConsumer<TShard>> consumers = new List<IConsumer<TShard>>(); | |
public event Action _onEmpty = () => { }; | |
public ShardingChannel(Func<T, TShard> shardRule, Action<T, Exception> shardingFailure) { | |
_shardRule = shardRule; | |
_shardingFailure = shardingFailure; | |
} | |
public IDisposable Subscribe(IFiber executor, Action<T> action) { | |
return Subscribe(new ShardingConsumer<T, TShard>(executor, this, DataToActions(action))); | |
} | |
public IDisposable Subscribe(IFiber executor, Action<TShard, T> action) { | |
return Subscribe(new ShardingConsumer<T, TShard>(executor, this, DataToActions(action))); | |
} | |
public IDisposable Subscribe(IFiber executor, Action<TShard, IEnumerable<T>> action) { | |
return Subscribe(new ShardingConsumer<T, TShard>(executor, this, DataToActions(action))); | |
} | |
public IDisposable Subscribe(IConsumer<TShard> consumer) { | |
consumers.Add(consumer); | |
consumer.Wakeup(); | |
return consumer; | |
} | |
public bool Publish(T message) { | |
lock (pendingQueue) { | |
pendingQueue.Add(message); | |
} | |
foreach (var consumer in consumers) { | |
if (consumer.Wakeup()) { | |
return true; // At least one thread knows to pick this up | |
} | |
} | |
return true; | |
} | |
bool AreMessagesInFlight { | |
get { | |
return activeShards.Count > 0; | |
} | |
} | |
internal Tuple<TShard, List<T>> FinishedWithShard(TShard shardToDispose) { | |
lock (pendingQueue) { | |
AssignShardsToPendingQueueItems(); | |
if (shardToDispose != null) { | |
List<T> result; | |
if (allQueuedShards.TryGetValue(shardToDispose, out result) && result.Count > 0) { | |
Assigned(result.Count); | |
allQueuedShards.Remove(shardToDispose); | |
// More stuff to do on this shard, let's keep going | |
return Tuple.Create(shardToDispose, result); | |
} | |
activeShards.Remove(shardToDispose); | |
} | |
return NextShard(); | |
} | |
} | |
private Tuple<TShard, List<T>> NextShard() { | |
if (inactiveShards.Count > 0) { | |
var item = inactiveShards.Dequeue(); | |
var shard = item.Item1; | |
activeShards.Add(shard); | |
allQueuedShards.Remove(shard); | |
Assigned(item.Item2.Count); | |
return item; | |
} | |
if (inactiveShards.Count == 0 && activeShards.Count == 0) { | |
if (allQueuedShards.Count > 0) { | |
_shardingFailure(default(T), new Exception("Queue count is incorrect.")); // This should never happen | |
} | |
_onEmpty(); | |
} | |
return null; | |
} | |
internal Tuple<TShard, List<T>> ConsumeFreeShard() { | |
lock (pendingQueue) { | |
AssignShardsToPendingQueueItems(); | |
return NextShard(); | |
} | |
} | |
int messagesAssigned = 0; | |
void Assigned(int count) { | |
messagesAssigned += count; | |
Console.WriteLine("Assigned: {0}", messagesAssigned); | |
} | |
int _received = 0; | |
private void AssignShardsToPendingQueueItems() { | |
TShard shard; | |
List<T> queue; | |
bool isChanged = false; | |
foreach (var item in pendingQueue) { | |
try { | |
shard = _shardRule(item); | |
if (!allQueuedShards.TryGetValue(shard, out queue)) { | |
isChanged = true; | |
allQueuedShards[shard] = queue = new List<T>(); | |
if (!activeShards.Contains(shard)) { | |
inactiveShards.Enqueue(new Tuple<TShard, List<T>>(shard, queue)); | |
} | |
} | |
queue.Add(item); | |
} catch (Exception ex) { | |
try { | |
_shardingFailure(item, ex); | |
} catch { } | |
} | |
} | |
_received += pendingQueue.Count; | |
pendingQueue.Clear(); | |
if (isChanged) { | |
Console.WriteLine("Active Shards/Inactive Shards/New Messages: {0}/{1}/{2}", activeShards.Count, inactiveShards.Count, _received); | |
_received = 0; | |
} | |
} | |
/// <summary> | |
/// Fires when the queue is empty. Could fire twice. No guarantees are made as to which | |
/// thread or fiber the event is executed upon. | |
/// </summary> | |
public event Action EmptyEvent { | |
add { | |
_onEmpty += value; | |
lock (pendingQueue) { | |
if (pendingQueue.Count == 0 && !AreMessagesInFlight) { | |
value(); | |
// Guarantee that action gets called if the queue is empty when you add | |
} | |
} | |
} | |
remove { _onEmpty -= value; } | |
} | |
static DataToActions<T, TShard> DataToActions(Action<TShard, T> onMessage) { | |
return (shard, queue) => queue.Select(v => new Action(() => onMessage(shard, v))); | |
} | |
static DataToActions<T, TShard> DataToActions(Action<T> onMessage) { | |
return (shard, queue) => new List<Action>(queue.Select(v => new Action(() => onMessage(v)))); | |
} | |
static DataToActions<T, TShard> DataToActions(Action<TShard, IEnumerable<T>> onMessage) { | |
return (shard, queue) => { | |
var result = new List<Action>(); | |
result.Add(() => onMessage(shard, queue)); | |
return result; | |
}; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics.Contracts; | |
using Retlang.Fibers; | |
namespace ColourCoding.Parallel { | |
public class ShardingConsumer<T, TShard> : IConsumer<TShard> | |
where TShard : class { | |
private readonly IFiber _fiber; | |
private readonly ShardingChannel<T, TShard> _channel; | |
private TShard _currentShard; | |
private bool isDisposed; | |
private readonly DataToActions<T, TShard> _dataToActions; | |
public ShardingConsumer(IFiber executor, ShardingChannel<T, TShard> channel, DataToActions<T, TShard> dataToActions) { | |
if (dataToActions== null) | |
{ | |
throw new ArgumentNullException("dataToActions"); | |
} | |
_fiber = executor; | |
_channel = channel; | |
_dataToActions = dataToActions; | |
_currentShard = null; | |
IsSleeping = true; | |
} | |
public bool Wakeup() { | |
lock (this) { | |
if (IsSleeping) { | |
IsSleeping = false; | |
_fiber.Enqueue(ConsumeNextShard); | |
return true; | |
} | |
return false; | |
} | |
} | |
public TShard ActiveShard { | |
get { return _currentShard; } | |
} | |
bool IsSleeping { | |
get; | |
set; | |
} | |
void ConsumeNextShard() { | |
lock (this) { | |
if (_currentShard != null || isDisposed) { | |
// If you receive multiple wakeups, you may already have a current shard and should ignore ConsumeNextShard | |
return; | |
} | |
var next = _channel.ConsumeFreeShard(); | |
HandleChannelResult(next); | |
} | |
} | |
private void HandleChannelResult(Tuple<TShard, List<T>> next) { | |
if (next == null) { | |
_currentShard = null; | |
IsSleeping = true; // No data, just stop. | |
} else { | |
_currentShard = next.Item1; | |
ProcessShard(next.Item1, next.Item2); | |
} | |
} | |
private void ProcessShard(TShard shard, List<T> list) { | |
foreach (var action in _dataToActions(shard, list)) { | |
_fiber.Enqueue(action); | |
} | |
_fiber.Enqueue(() => { | |
var next = _channel.FinishedWithShard(shard); | |
HandleChannelResult(next); | |
}); | |
} | |
public void Dispose() { | |
isDisposed = true; // All this does is affect the first line of ConsumeNextShard. | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment