Skip to content

Instantly share code, notes, and snippets.

@alfeg
Last active August 29, 2015 14:14
Show Gist options
  • Save alfeg/f7262474abf59a3897f2 to your computer and use it in GitHub Desktop.
Save alfeg/f7262474abf59a3897f2 to your computer and use it in GitHub Desktop.
Atomic blocking queue
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Isam.Esent.Collections.Generic;
namespace Esent
{
class Storage
{
static PersistentDictionary<string, Entity> dict = new PersistentDictionary<string, Entity>("data");
public Entity GetById(string id)
{
return dict[id];
}
public void Store(string id, Entity entity)
{
dict[id] = entity;
}
}
[Serializable]
public struct Entity
{
public int Id { get; set; }
public int Counter { get; set; }
}
class Program
{
static void Main(string[] args)
{
var storage = new Storage();
var taskCount = 1000;
var tasks = new Task[taskCount];
var entity = new Entity();
var entityId = "ID";
storage.Store(entityId, entity);
using (var atomic = new Atomic())
{
for (int i = 0; i < taskCount; i++)
{
tasks[i] = Task.Run(() =>
{
atomic.Execute(() =>
{
var storedEntity = storage.GetById(entityId);
storedEntity.Counter++;
storage.Store(entityId, storedEntity);
});
});
}
Task.WaitAll(tasks);
}
Console.WriteLine(storage.GetById(entityId).Counter);
}
}
public class Atomic : IDisposable
{
private static BlockingCollection<Action> queue = new BlockingCollection<Action>();
public Atomic()
{
Task.Run(() =>
{
foreach (var action in queue.GetConsumingEnumerable())
{
action();
}
});
}
// blocking call, will continue as soon as action completed
public void Execute(Action action)
{
var man = new ManualResetEventSlim(false);
queue.Add(() =>
{
action();
man.Set();
});
man.Wait();
}
public void Dispose()
{
queue.CompleteAdding();
}
}
public class AtomicHasher : IDisposable
{
readonly private Dictionary<string, Atomic> atoms = new Dictionary<string, Atomic>();
readonly private Func<string, string> hasher;
public AtomicHasher(Func<string, string> hasher = null)
{
this.hasher = hasher ?? (id =>
{
return Math.Abs(id.GetHashCode() % Environment.ProcessorCount).ToString();
});
}
public void Execute(string id, Action action)
{
var hash = hasher(id);
lock (atoms)
{
if (!atoms.ContainsKey(hash))
{
atoms[hash] = new Atomic(hash);
}
}
atoms[hash].Execute(action);
}
public void Dispose()
{
atoms.Clear();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment