Instantly share code, notes, and snippets.

Embed
What would you like to do?
Shard Balancer
using System;
namespace ColourCoding.Parallel
{
public interface IConsumer : IDisposable
{
bool Wakeup();
}
public interface IConsumer<TShard> : IConsumer
where TShard : class
{
TShard ActiveShard { get; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using Retlang.Channels;
using Retlang.Fibers;
namespace ColourCoding.Parallel {
public delegate IEnumerable<Action> DataToActions<T, TShard>(TShard shard, IEnumerable<T> items);
public class ShardingChannel<T, TShard> : IPublisher<T>
where TShard : class {
private readonly Func<T, TShard> _shardRule;
private readonly Action<T, Exception> _shardingFailure;
private readonly Dictionary<TShard, List<T>> allQueuedShards = new Dictionary<TShard, List<T>>();
private readonly Queue<Tuple<TShard, List<T>>> inactiveShards = new Queue<Tuple<TShard, List<T>>>();
HashSet<TShard> activeShards = new HashSet<TShard>();
private readonly List<T> pendingQueue = new List<T>();
private readonly List<IConsumer<TShard>> consumers = new List<IConsumer<TShard>>();
public event Action _onEmpty = () => { };
public ShardingChannel(Func<T, TShard> shardRule, Action<T, Exception> shardingFailure) {
_shardRule = shardRule;
_shardingFailure = shardingFailure;
}
public IDisposable Subscribe(IFiber executor, Action<T> action) {
return Subscribe(new ShardingConsumer<T, TShard>(executor, this, DataToActions(action)));
}
public IDisposable Subscribe(IFiber executor, Action<TShard, T> action) {
return Subscribe(new ShardingConsumer<T, TShard>(executor, this, DataToActions(action)));
}
public IDisposable Subscribe(IFiber executor, Action<TShard, IEnumerable<T>> action) {
return Subscribe(new ShardingConsumer<T, TShard>(executor, this, DataToActions(action)));
}
public IDisposable Subscribe(IConsumer<TShard> consumer) {
consumers.Add(consumer);
consumer.Wakeup();
return consumer;
}
public bool Publish(T message) {
lock (pendingQueue) {
pendingQueue.Add(message);
}
foreach (var consumer in consumers) {
if (consumer.Wakeup()) {
return true; // At least one thread knows to pick this up
}
}
return true;
}
bool AreMessagesInFlight {
get {
return activeShards.Count > 0;
}
}
internal Tuple<TShard, List<T>> FinishedWithShard(TShard shardToDispose) {
lock (pendingQueue) {
AssignShardsToPendingQueueItems();
if (shardToDispose != null) {
List<T> result;
if (allQueuedShards.TryGetValue(shardToDispose, out result) && result.Count > 0) {
Assigned(result.Count);
allQueuedShards.Remove(shardToDispose);
// More stuff to do on this shard, let's keep going
return Tuple.Create(shardToDispose, result);
}
activeShards.Remove(shardToDispose);
}
return NextShard();
}
}
private Tuple<TShard, List<T>> NextShard() {
if (inactiveShards.Count > 0) {
var item = inactiveShards.Dequeue();
var shard = item.Item1;
activeShards.Add(shard);
allQueuedShards.Remove(shard);
Assigned(item.Item2.Count);
return item;
}
if (inactiveShards.Count == 0 && activeShards.Count == 0) {
if (allQueuedShards.Count > 0) {
_shardingFailure(default(T), new Exception("Queue count is incorrect.")); // This should never happen
}
_onEmpty();
}
return null;
}
internal Tuple<TShard, List<T>> ConsumeFreeShard() {
lock (pendingQueue) {
AssignShardsToPendingQueueItems();
return NextShard();
}
}
int messagesAssigned = 0;
void Assigned(int count) {
messagesAssigned += count;
Console.WriteLine("Assigned: {0}", messagesAssigned);
}
int _received = 0;
private void AssignShardsToPendingQueueItems() {
TShard shard;
List<T> queue;
bool isChanged = false;
foreach (var item in pendingQueue) {
try {
shard = _shardRule(item);
if (!allQueuedShards.TryGetValue(shard, out queue)) {
isChanged = true;
allQueuedShards[shard] = queue = new List<T>();
if (!activeShards.Contains(shard)) {
inactiveShards.Enqueue(new Tuple<TShard, List<T>>(shard, queue));
}
}
queue.Add(item);
} catch (Exception ex) {
try {
_shardingFailure(item, ex);
} catch { }
}
}
_received += pendingQueue.Count;
pendingQueue.Clear();
if (isChanged) {
Console.WriteLine("Active Shards/Inactive Shards/New Messages: {0}/{1}/{2}", activeShards.Count, inactiveShards.Count, _received);
_received = 0;
}
}
/// <summary>
/// Fires when the queue is empty. Could fire twice. No guarantees are made as to which
/// thread or fiber the event is executed upon.
/// </summary>
public event Action EmptyEvent {
add {
_onEmpty += value;
lock (pendingQueue) {
if (pendingQueue.Count == 0 && !AreMessagesInFlight) {
value();
// Guarantee that action gets called if the queue is empty when you add
}
}
}
remove { _onEmpty -= value; }
}
static DataToActions<T, TShard> DataToActions(Action<TShard, T> onMessage) {
return (shard, queue) => queue.Select(v => new Action(() => onMessage(shard, v)));
}
static DataToActions<T, TShard> DataToActions(Action<T> onMessage) {
return (shard, queue) => new List<Action>(queue.Select(v => new Action(() => onMessage(v))));
}
static DataToActions<T, TShard> DataToActions(Action<TShard, IEnumerable<T>> onMessage) {
return (shard, queue) => {
var result = new List<Action>();
result.Add(() => onMessage(shard, queue));
return result;
};
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using Retlang.Fibers;
namespace ColourCoding.Parallel {
public class ShardingConsumer<T, TShard> : IConsumer<TShard>
where TShard : class {
private readonly IFiber _fiber;
private readonly ShardingChannel<T, TShard> _channel;
private TShard _currentShard;
private bool isDisposed;
private readonly DataToActions<T, TShard> _dataToActions;
public ShardingConsumer(IFiber executor, ShardingChannel<T, TShard> channel, DataToActions<T, TShard> dataToActions) {
if (dataToActions== null)
{
throw new ArgumentNullException("dataToActions");
}
_fiber = executor;
_channel = channel;
_dataToActions = dataToActions;
_currentShard = null;
IsSleeping = true;
}
public bool Wakeup() {
lock (this) {
if (IsSleeping) {
IsSleeping = false;
_fiber.Enqueue(ConsumeNextShard);
return true;
}
return false;
}
}
public TShard ActiveShard {
get { return _currentShard; }
}
bool IsSleeping {
get;
set;
}
void ConsumeNextShard() {
lock (this) {
if (_currentShard != null || isDisposed) {
// If you receive multiple wakeups, you may already have a current shard and should ignore ConsumeNextShard
return;
}
var next = _channel.ConsumeFreeShard();
HandleChannelResult(next);
}
}
private void HandleChannelResult(Tuple<TShard, List<T>> next) {
if (next == null) {
_currentShard = null;
IsSleeping = true; // No data, just stop.
} else {
_currentShard = next.Item1;
ProcessShard(next.Item1, next.Item2);
}
}
private void ProcessShard(TShard shard, List<T> list) {
foreach (var action in _dataToActions(shard, list)) {
_fiber.Enqueue(action);
}
_fiber.Enqueue(() => {
var next = _channel.FinishedWithShard(shard);
HandleChannelResult(next);
});
}
public void Dispose() {
isDisposed = true; // All this does is affect the first line of ConsumeNextShard.
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment